Skip to content

Commit 392a3df

Browse files
authored
fix(hermes): ignore broadcast send result (#1450)
Sending over the broadcast channel only fails when there are no receivers. We should ignore it instead of propagating it.
1 parent a607335 commit 392a3df

File tree

2 files changed

+23
-19
lines changed

2 files changed

+23
-19
lines changed

hermes/src/aggregate.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -264,23 +264,22 @@ pub async fn store_update(state: &State, update: Update) -> Result<()> {
264264
// Update the aggregate state
265265
let mut aggregate_state = state.aggregate_state.write().await;
266266

267-
// Check if the update is new or out of order
268-
match aggregate_state.latest_completed_slot {
267+
// Send update event to subscribers. We are purposefully ignoring the result
268+
// because there might be no subscribers.
269+
let _ = match aggregate_state.latest_completed_slot {
269270
None => {
270271
aggregate_state.latest_completed_slot.replace(slot);
271-
state.api_update_tx.send(AggregationEvent::New { slot })?;
272+
state.api_update_tx.send(AggregationEvent::New { slot })
272273
}
273274
Some(latest) if slot > latest => {
274275
state.prune_removed_keys(message_state_keys).await;
275276
aggregate_state.latest_completed_slot.replace(slot);
276-
state.api_update_tx.send(AggregationEvent::New { slot })?;
277+
state.api_update_tx.send(AggregationEvent::New { slot })
277278
}
278-
_ => {
279-
state
280-
.api_update_tx
281-
.send(AggregationEvent::OutOfOrder { slot })?;
282-
}
283-
}
279+
_ => state
280+
.api_update_tx
281+
.send(AggregationEvent::OutOfOrder { slot }),
282+
};
284283

285284
aggregate_state.latest_completed_slot = aggregate_state
286285
.latest_completed_slot

hermes/src/network/wormhole.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,10 @@ pub async fn process_message(state: Arc<State>, vaa_bytes: Vec<u8>) -> Result<()
225225
)?;
226226

227227
// Finally, store the resulting VAA in Hermes.
228-
store_vaa(state.clone(), vaa.sequence, vaa_bytes).await?;
228+
let sequence = vaa.sequence;
229+
tokio::spawn(async move {
230+
store_vaa(state.clone(), sequence, vaa_bytes).await;
231+
});
229232

230233
Ok(())
231234
}
@@ -334,22 +337,24 @@ pub fn verify_vaa<'a>(
334337
}
335338

336339
#[tracing::instrument(skip(state, vaa_bytes))]
337-
pub async fn store_vaa(state: Arc<State>, sequence: u64, vaa_bytes: Vec<u8>) -> Result<()> {
340+
pub async fn store_vaa(state: Arc<State>, sequence: u64, vaa_bytes: Vec<u8>) {
338341
// Check VAA hasn't already been seen, this may have been checked previously
339-
// but due to async nature It's possible other threads have mutated the state
342+
// but due to async nature it's possible other threads have mutated the state
340343
// since this VAA started processing.
341344
let mut observed_vaa_seqs = state.observed_vaa_seqs.write().await;
342-
ensure!(
343-
!observed_vaa_seqs.contains(&sequence),
344-
"Previously observed VAA: {}",
345-
sequence,
346-
);
345+
if observed_vaa_seqs.contains(&sequence) {
346+
return;
347+
}
347348

348349
// Clear old cached VAA sequences.
349350
while observed_vaa_seqs.len() > OBSERVED_CACHE_SIZE {
350351
observed_vaa_seqs.pop_first();
351352
}
352353

353354
// Hand the VAA to the aggregate store.
354-
crate::aggregate::store_update(&state, crate::aggregate::Update::Vaa(vaa_bytes)).await
355+
if let Err(e) =
356+
crate::aggregate::store_update(&state, crate::aggregate::Update::Vaa(vaa_bytes)).await
357+
{
358+
tracing::error!(error = ?e, "Failed to store VAA in aggregate store.");
359+
}
355360
}

0 commit comments

Comments
 (0)