diff --git a/crates/sui-bridge/src/monitor.rs b/crates/sui-bridge/src/monitor.rs index 09cb914c93a11..53bfbc92da6dc 100644 --- a/crates/sui-bridge/src/monitor.rs +++ b/crates/sui-bridge/src/monitor.rs @@ -1,8 +1,10 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -//! `BridgeMonitor` receives all `SuiBridgeEvent` and handles them accordingly. +//! `BridgeMonitor` receives all `SuiBridgeEvent` and `EthBridgeEvent` +//! and handles them accordingly. +use crate::abi::EthBridgeEvent; use crate::client::bridge_authority_aggregator::BridgeAuthorityAggregator; use crate::crypto::BridgeAuthorityPublicKeyBytes; use crate::events::{BlocklistValidatorEvent, CommitteeMemberUrlUpdateEvent}; @@ -22,7 +24,8 @@ const REFRESH_BRIDGE_RETRY_TIMES: u64 = 3; pub struct BridgeMonitor { sui_client: Arc>, - monitor_rx: mysten_metrics::metered_channel::Receiver, + sui_monitor_rx: mysten_metrics::metered_channel::Receiver, + eth_monitor_rx: mysten_metrics::metered_channel::Receiver, bridge_auth_agg: Arc>, bridge_paused_watch_tx: tokio::sync::watch::Sender, sui_token_type_tags: Arc>>, @@ -35,7 +38,8 @@ where { pub fn new( sui_client: Arc>, - monitor_rx: mysten_metrics::metered_channel::Receiver, + sui_monitor_rx: mysten_metrics::metered_channel::Receiver, + eth_monitor_rx: mysten_metrics::metered_channel::Receiver, bridge_auth_agg: Arc>, bridge_paused_watch_tx: tokio::sync::watch::Sender, sui_token_type_tags: Arc>>, @@ -43,7 +47,8 @@ where ) -> Self { Self { sui_client, - monitor_rx, + sui_monitor_rx, + eth_monitor_rx, bridge_auth_agg, bridge_paused_watch_tx, sui_token_type_tags, @@ -55,7 +60,8 @@ where tracing::info!("Starting BridgeMonitor"); let Self { sui_client, - mut monitor_rx, + mut sui_monitor_rx, + mut eth_monitor_rx, bridge_auth_agg, bridge_paused_watch_tx, sui_token_type_tags, @@ -63,100 +69,134 @@ where } = self; let mut latest_token_config = (*sui_token_type_tags.load().clone()).clone(); - while let Some(events) = monitor_rx.recv().await { - match events { - SuiBridgeEvent::SuiToEthTokenBridgeV1(_) => (), - SuiBridgeEvent::TokenTransferApproved(_) => (), - SuiBridgeEvent::TokenTransferClaimed(_) => (), - SuiBridgeEvent::TokenTransferAlreadyApproved(_) => (), - SuiBridgeEvent::TokenTransferAlreadyClaimed(_) => (), - SuiBridgeEvent::TokenTransferLimitExceed(_) => { - // TODO do we want to do anything here? + loop { + tokio::select! { + sui_event = sui_monitor_rx.recv() => { + if let Some(sui_event) = sui_event { + Self::handle_sui_events( + sui_event, + &sui_client, + &bridge_auth_agg, + &bridge_paused_watch_tx, + &sui_token_type_tags, + &bridge_metrics, + &mut latest_token_config, + ) + .await; + } else { + panic!("BridgeMonitor sui events channel was closed unexpectedly"); + } } - - SuiBridgeEvent::EmergencyOpEvent(event) => { - info!("Received EmergencyOpEvent: {:?}", event); - bridge_metrics - .observed_governance_actions - .with_label_values(&["emergency_op", "sui"]) - .inc(); - let is_paused = get_latest_bridge_pause_status_with_emergency_event( - sui_client.clone(), - event, - Duration::from_secs(10), - ) - .await; - bridge_paused_watch_tx - .send(is_paused) - .expect("Bridge pause status watch channel should not be closed"); + eth_event = eth_monitor_rx.recv() => { + if let Some(_eth_event) = eth_event { + // TODO + } else { + panic!("BridgeMonitor eth events channel was closed unexpectedly"); + } } + } + } + } - SuiBridgeEvent::CommitteeMemberRegistration(_) => (), - SuiBridgeEvent::CommitteeUpdateEvent(_) => (), - - SuiBridgeEvent::CommitteeMemberUrlUpdateEvent(event) => { - info!("Received CommitteeMemberUrlUpdateEvent: {:?}", event); - let new_committee = get_latest_bridge_committee_with_url_update_event( - sui_client.clone(), - event, - Duration::from_secs(10), - ) - .await; - bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(Arc::new( - new_committee, - )))); - info!("Committee updated with CommitteeMemberUrlUpdateEvent"); - } + async fn handle_sui_events( + event: SuiBridgeEvent, + sui_client: &Arc>, + bridge_auth_agg: &Arc>, + bridge_paused_watch_tx: &tokio::sync::watch::Sender, + sui_token_type_tags: &Arc>>, + bridge_metrics: &Arc, + latest_token_config: &mut HashMap, + ) { + match event { + SuiBridgeEvent::SuiToEthTokenBridgeV1(_) => (), + SuiBridgeEvent::TokenTransferApproved(_) => (), + SuiBridgeEvent::TokenTransferClaimed(_) => (), + SuiBridgeEvent::TokenTransferAlreadyApproved(_) => (), + SuiBridgeEvent::TokenTransferAlreadyClaimed(_) => (), + SuiBridgeEvent::TokenTransferLimitExceed(_) => { + // TODO do we want to do anything here? + } - SuiBridgeEvent::BlocklistValidatorEvent(event) => { - info!("Received BlocklistValidatorEvent: {:?}", event); - bridge_metrics - .observed_governance_actions - .with_label_values(&["blocklist_validator", "sui"]) - .inc(); - let new_committee = get_latest_bridge_committee_with_blocklist_event( - sui_client.clone(), - event, - Duration::from_secs(10), - ) - .await; - bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(Arc::new( - new_committee, - )))); - info!("Committee updated with BlocklistValidatorEvent"); - } + SuiBridgeEvent::EmergencyOpEvent(event) => { + info!("Received EmergencyOpEvent: {:?}", event); + bridge_metrics + .observed_governance_actions + .with_label_values(&["emergency_op", "sui"]) + .inc(); + let is_paused = get_latest_bridge_pause_status_with_emergency_event( + sui_client.clone(), + event, + Duration::from_secs(10), + ) + .await; + bridge_paused_watch_tx + .send(is_paused) + .expect("Bridge pause status watch channel should not be closed"); + } - SuiBridgeEvent::TokenRegistrationEvent(_) => (), - - SuiBridgeEvent::NewTokenEvent(event) => { - info!("Received NewTokenEvent: {:?}", event); - bridge_metrics - .observed_governance_actions - .with_label_values(&["new_token", "sui"]) - .inc(); - if let std::collections::hash_map::Entry::Vacant(entry) = - // We only add new tokens but not remove so it's ok to just insert - latest_token_config.entry(event.token_id) - { - entry.insert(event.type_name.clone()); - sui_token_type_tags.store(Arc::new(latest_token_config.clone())); - } else { - // invariant - assert_eq!(event.type_name, latest_token_config[&event.token_id]); - } - } + SuiBridgeEvent::CommitteeMemberRegistration(_) => (), + SuiBridgeEvent::CommitteeUpdateEvent(_) => (), + + SuiBridgeEvent::CommitteeMemberUrlUpdateEvent(event) => { + info!("Received CommitteeMemberUrlUpdateEvent: {:?}", event); + let new_committee = get_latest_bridge_committee_with_url_update_event( + sui_client.clone(), + event, + Duration::from_secs(10), + ) + .await; + bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(Arc::new( + new_committee, + )))); + info!("Committee updated with CommitteeMemberUrlUpdateEvent"); + } + + SuiBridgeEvent::BlocklistValidatorEvent(event) => { + info!("Received BlocklistValidatorEvent: {:?}", event); + bridge_metrics + .observed_governance_actions + .with_label_values(&["blocklist_validator", "sui"]) + .inc(); + let new_committee = get_latest_bridge_committee_with_blocklist_event( + sui_client.clone(), + event, + Duration::from_secs(10), + ) + .await; + bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(Arc::new( + new_committee, + )))); + info!("Committee updated with BlocklistValidatorEvent"); + } + + SuiBridgeEvent::TokenRegistrationEvent(_) => (), - SuiBridgeEvent::UpdateTokenPriceEvent(event) => { - info!("Received UpdateTokenPriceEvent: {:?}", event); - bridge_metrics - .observed_governance_actions - .with_label_values(&["update_token_price", "sui"]) - .inc(); + SuiBridgeEvent::NewTokenEvent(event) => { + info!("Received NewTokenEvent: {:?}", event); + bridge_metrics + .observed_governance_actions + .with_label_values(&["new_token", "sui"]) + .inc(); + if let std::collections::hash_map::Entry::Vacant(entry) = + // We only add new tokens but not remove so it's ok to just insert + latest_token_config.entry(event.token_id) + { + entry.insert(event.type_name.clone()); + sui_token_type_tags.store(Arc::new(latest_token_config.clone())); + } else { + // invariant + assert_eq!(event.type_name, latest_token_config[&event.token_id]); } } - } - panic!("BridgeMonitor channel was closed unexpectedly"); + SuiBridgeEvent::UpdateTokenPriceEvent(event) => { + info!("Received UpdateTokenPriceEvent: {:?}", event); + bridge_metrics + .observed_governance_actions + .with_label_values(&["update_token_price", "sui"]) + .inc(); + } + } } } @@ -742,8 +782,10 @@ mod tests { #[tokio::test] async fn test_update_bridge_authority_aggregation_with_url_change_event() { let ( - monitor_tx, - monitor_rx, + sui_monitor_tx, + sui_monitor_rx, + _eth_monitor_tx, + eth_monitor_rx, sui_client_mock, sui_client, bridge_pause_tx, @@ -759,7 +801,8 @@ mod tests { let _handle = tokio::task::spawn( BridgeMonitor::new( sui_client.clone(), - monitor_rx, + sui_monitor_rx, + eth_monitor_rx, agg.clone(), bridge_pause_tx, sui_token_type_tags, @@ -773,7 +816,7 @@ mod tests { let new_committee_summary = bridge_committee_to_bridge_committee_summary(new_committee.clone()); sui_client_mock.set_bridge_committee(new_committee_summary.clone()); - monitor_tx + sui_monitor_tx .send(SuiBridgeEvent::CommitteeMemberUrlUpdateEvent( CommitteeMemberUrlUpdateEvent { member: authorities[0].pubkey.clone(), @@ -798,8 +841,10 @@ mod tests { #[tokio::test] async fn test_update_bridge_authority_aggregation_with_blocklist_event() { let ( - monitor_tx, - monitor_rx, + sui_monitor_tx, + sui_monitor_rx, + _eth_monitor_tx, + eth_monitor_rx, sui_client_mock, sui_client, bridge_pause_tx, @@ -815,7 +860,8 @@ mod tests { let _handle = tokio::task::spawn( BridgeMonitor::new( sui_client.clone(), - monitor_rx, + sui_monitor_rx, + eth_monitor_rx, agg.clone(), bridge_pause_tx, sui_token_type_tags, @@ -829,7 +875,7 @@ mod tests { let new_committee_summary = bridge_committee_to_bridge_committee_summary(new_committee.clone()); sui_client_mock.set_bridge_committee(new_committee_summary.clone()); - monitor_tx + sui_monitor_tx .send(SuiBridgeEvent::BlocklistValidatorEvent( BlocklistValidatorEvent { public_keys: vec![to_blocklist.pubkey.clone()], @@ -852,8 +898,10 @@ mod tests { #[tokio::test] async fn test_update_bridge_pause_status_with_emergency_event() { let ( - monitor_tx, - monitor_rx, + sui_monitor_tx, + sui_monitor_rx, + _eth_monitor_tx, + eth_monitor_rx, sui_client_mock, sui_client, bridge_pause_tx, @@ -872,7 +920,8 @@ mod tests { let _handle = tokio::task::spawn( BridgeMonitor::new( sui_client.clone(), - monitor_rx, + sui_monitor_rx, + eth_monitor_rx, agg.clone(), bridge_pause_tx, sui_token_type_tags, @@ -882,7 +931,7 @@ mod tests { ); sui_client_mock.set_is_bridge_paused(event.frozen); - monitor_tx + sui_monitor_tx .send(SuiBridgeEvent::EmergencyOpEvent(event.clone())) .await .unwrap(); @@ -895,8 +944,10 @@ mod tests { #[tokio::test] async fn test_update_sui_token_type_tags() { let ( - monitor_tx, - monitor_rx, + sui_monitor_tx, + sui_monitor_rx, + _eth_monitor_tx, + eth_monitor_rx, _sui_client_mock, sui_client, bridge_pause_tx, @@ -920,7 +971,8 @@ mod tests { let _handle = tokio::task::spawn( BridgeMonitor::new( sui_client.clone(), - monitor_rx, + sui_monitor_rx, + eth_monitor_rx, agg.clone(), bridge_pause_tx, sui_token_type_tags_clone, @@ -929,7 +981,7 @@ mod tests { .run(), ); - monitor_tx + sui_monitor_tx .send(SuiBridgeEvent::NewTokenEvent(event.clone())) .await .unwrap(); @@ -947,6 +999,8 @@ mod tests { fn setup() -> ( mysten_metrics::metered_channel::Sender, mysten_metrics::metered_channel::Receiver, + mysten_metrics::metered_channel::Sender, + mysten_metrics::metered_channel::Receiver, SuiMockClient, Arc>, tokio::sync::watch::Sender, @@ -962,13 +1016,21 @@ mod tests { let sui_client_mock = SuiMockClient::default(); let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone())); - let (monitor_tx, monitor_rx) = mysten_metrics::metered_channel::channel( + let (sui_monitor_tx, sui_monitor_rx) = mysten_metrics::metered_channel::channel( 10000, &mysten_metrics::get_metrics() .unwrap() .channel_inflight - .with_label_values(&["monitor_queue"]), + .with_label_values(&["sui_monitor_queue"]), ); + let (eth_monitor_tx, eth_monitor_rx) = mysten_metrics::metered_channel::channel( + 10000, + &mysten_metrics::get_metrics() + .unwrap() + .channel_inflight + .with_label_values(&["eth_monitor_queue"]), + ); + let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false); let authorities = vec![ get_test_authority_and_key(2500, 0 /* port, dummy value */).0, @@ -977,8 +1039,10 @@ mod tests { get_test_authority_and_key(2500, 0 /* port, dummy value */).0, ]; ( - monitor_tx, - monitor_rx, + sui_monitor_tx, + sui_monitor_rx, + eth_monitor_tx, + eth_monitor_rx, sui_client_mock, sui_client, bridge_pause_tx, diff --git a/crates/sui-bridge/src/node.rs b/crates/sui-bridge/src/node.rs index 517aa496d3331..01d77aab2ae3a 100644 --- a/crates/sui-bridge/src/node.rs +++ b/crates/sui-bridge/src/node.rs @@ -119,13 +119,21 @@ async fn start_client_components( let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(is_bridge_paused); - let (monitor_tx, monitor_rx) = mysten_metrics::metered_channel::channel( + let (sui_monitor_tx, sui_monitor_rx) = mysten_metrics::metered_channel::channel( 10000, &mysten_metrics::get_metrics() .unwrap() .channel_inflight - .with_label_values(&["monitor_queue"]), + .with_label_values(&["sui_monitor_queue"]), ); + let (eth_monitor_tx, eth_monitor_rx) = mysten_metrics::metered_channel::channel( + 10000, + &mysten_metrics::get_metrics() + .unwrap() + .channel_inflight + .with_label_values(&["eth_monitor_queue"]), + ); + let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(sui_token_type_tags))); let bridge_action_executor = BridgeActionExecutor::new( sui_client.clone(), @@ -142,7 +150,8 @@ async fn start_client_components( let monitor = BridgeMonitor::new( sui_client.clone(), - monitor_rx, + sui_monitor_rx, + eth_monitor_rx, bridge_auth_agg.clone(), bridge_pause_tx, sui_token_type_tags, @@ -155,7 +164,8 @@ async fn start_client_components( sui_events_rx, eth_events_rx, store.clone(), - monitor_tx, + sui_monitor_tx, + eth_monitor_tx, metrics, ); diff --git a/crates/sui-bridge/src/orchestrator.rs b/crates/sui-bridge/src/orchestrator.rs index 247f5b13ea425..dcd89f7b8eef7 100644 --- a/crates/sui-bridge/src/orchestrator.rs +++ b/crates/sui-bridge/src/orchestrator.rs @@ -29,7 +29,8 @@ pub struct BridgeOrchestrator { sui_events_rx: mysten_metrics::metered_channel::Receiver<(Identifier, Vec)>, eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec)>, store: Arc, - monitor_tx: mysten_metrics::metered_channel::Sender, + sui_monitor_tx: mysten_metrics::metered_channel::Sender, + eth_monitor_tx: mysten_metrics::metered_channel::Sender, metrics: Arc, } @@ -42,7 +43,8 @@ where sui_events_rx: mysten_metrics::metered_channel::Receiver<(Identifier, Vec)>, eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec)>, store: Arc, - monitor_tx: mysten_metrics::metered_channel::Sender, + sui_monitor_tx: mysten_metrics::metered_channel::Sender, + eth_monitor_tx: mysten_metrics::metered_channel::Sender, metrics: Arc, ) -> Self { Self { @@ -50,7 +52,8 @@ where sui_events_rx, eth_events_rx, store, - monitor_tx, + sui_monitor_tx, + eth_monitor_tx, metrics, } } @@ -72,7 +75,7 @@ where store_clone, executor_sender_clone, self.sui_events_rx, - self.monitor_tx, + self.sui_monitor_tx, metrics_clone, ))); let store_clone = self.store.clone(); @@ -93,6 +96,7 @@ where store_clone, executor_sender, self.eth_events_rx, + self.eth_monitor_tx, metrics_clone, ))); @@ -193,6 +197,7 @@ where u64, Vec, )>, + eth_monitor_tx: mysten_metrics::metered_channel::Sender, metrics: Arc, ) { info!("Starting eth watcher task"); @@ -226,6 +231,12 @@ where let bridge_event = opt_bridge_event.unwrap(); info!("Observed Eth bridge event: {:?}", bridge_event); + // Send event to monitor + eth_monitor_tx + .send(bridge_event.clone()) + .await + .expect("Sending event to monitor channel should not fail"); + match bridge_event.try_into_bridge_action(log.tx_hash, log.log_index_in_tx) { Ok(Some(action)) => actions.push(action), Ok(None) => {} @@ -233,7 +244,6 @@ where error!(eth_tx_hash=?log.tx_hash, eth_event_index=?log.log_index_in_tx, "Error converting EthBridgeEvent to BridgeAction: {:?}", e); } } - // TODO: handle non Action events } if !actions.is_empty() { info!("Received {} actions from Eth: {:?}", actions.len(), actions); @@ -285,8 +295,10 @@ mod tests { sui_events_rx, _eth_events_tx, eth_events_rx, - monitor_tx, - _monitor_rx, + sui_monitor_tx, + _sui_monitor_rx, + eth_monitor_tx, + _eth_monitor_rx, sui_client, store, ) = setup(); @@ -299,7 +311,8 @@ mod tests { sui_events_rx, eth_events_rx, store.clone(), - monitor_tx, + sui_monitor_tx, + eth_monitor_tx, metrics, ) .run(executor) @@ -349,8 +362,10 @@ mod tests { sui_events_rx, eth_events_tx, eth_events_rx, - monitor_tx, - _monitor_rx, + sui_monitor_tx, + _sui_monitor_rx, + eth_monitor_tx, + _eth_monitor_rx, sui_client, store, ) = setup(); @@ -363,7 +378,8 @@ mod tests { sui_events_rx, eth_events_rx, store.clone(), - monitor_tx, + sui_monitor_tx, + eth_monitor_tx, metrics, ) .run(executor) @@ -419,8 +435,10 @@ mod tests { sui_events_rx, _eth_events_tx, eth_events_rx, - monitor_tx, - _monitor_rx, + sui_monitor_tx, + _sui_monitor_rx, + eth_monitor_tx, + _eth_monitor_rx, sui_client, store, ) = setup(); @@ -449,7 +467,8 @@ mod tests { sui_events_rx, eth_events_rx, store.clone(), - monitor_tx, + sui_monitor_tx, + eth_monitor_tx, metrics, ) .run(executor) @@ -472,6 +491,8 @@ mod tests { mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec)>, mysten_metrics::metered_channel::Sender, mysten_metrics::metered_channel::Receiver, + mysten_metrics::metered_channel::Sender, + mysten_metrics::metered_channel::Receiver, SuiClient, Arc, ) { @@ -502,20 +523,29 @@ mod tests { .channel_inflight .with_label_values(&["unit_test_sui_events_queue"]), ); - let (monitor_tx, monitor_rx) = mysten_metrics::metered_channel::channel( + let (sui_monitor_tx, sui_monitor_rx) = mysten_metrics::metered_channel::channel( + 10000, + &mysten_metrics::get_metrics() + .unwrap() + .channel_inflight + .with_label_values(&["sui_monitor_queue"]), + ); + let (eth_monitor_tx, eth_monitor_rx) = mysten_metrics::metered_channel::channel( 10000, &mysten_metrics::get_metrics() .unwrap() .channel_inflight - .with_label_values(&["monitor_queue"]), + .with_label_values(&["eth_monitor_queue"]), ); ( sui_events_tx, sui_events_rx, eth_events_tx, eth_events_rx, - monitor_tx, - monitor_rx, + sui_monitor_tx, + sui_monitor_rx, + eth_monitor_tx, + eth_monitor_rx, sui_client, store, )