Skip to content

Commit f8d75a2

Browse files
Matthieu Vachonmaoueh
andauthored
node: fix a potential crash when Firehose provider is faulty (#3129)
When a provider is faulty, it's removed from the list of endpoints for this particular chain. This means later on, places where at least one valid provide must exists panicked, this is the case for the `FirehoseBlockIngestor`. As well as removing the faulty provider in `connect_firehose_networks`, we also completely remove the chain itself if at the end the chain has 0 providers. Alternative would be to keep the chain but make downstream users to deal with the fact that providers could be 0. I decided on removing at the source instead since a chain with 0 providers is like no chain at all since nothing could run. ### Renames I took the opportunity to trimmed down the structs where firehose providers and chain id to providers are stored. I was always finding hard to reason about those struct where there is a lot of repition in the name and the single fields. Hopefully it's a bit better now. I will highlight the bug fix in the diff. Co-authored-by: Matthieu Vachon <matt@streamingfast.io>
1 parent f480624 commit f8d75a2

File tree

5 files changed

+86
-72
lines changed

5 files changed

+86
-72
lines changed

chain/ethereum/src/chain.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use anyhow::{Context, Error};
22
use graph::blockchain::BlockchainKind;
33
use graph::data::subgraph::UnifiedMappingApiVersion;
4-
use graph::firehose::FirehoseNetworkEndpoints;
4+
use graph::firehose::FirehoseEndpoints;
55
use graph::prelude::{
66
EthereumBlock, EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, StopwatchMetrics,
77
};
@@ -70,7 +70,7 @@ pub struct Chain {
7070
name: String,
7171
node_id: NodeId,
7272
registry: Arc<dyn MetricsRegistry>,
73-
firehose_endpoints: Arc<FirehoseNetworkEndpoints>,
73+
firehose_endpoints: Arc<FirehoseEndpoints>,
7474
eth_adapters: Arc<EthereumNetworkAdapters>,
7575
ancestor_count: BlockNumber,
7676
chain_store: Arc<dyn ChainStore>,
@@ -96,7 +96,7 @@ impl Chain {
9696
chain_store: Arc<dyn ChainStore>,
9797
call_cache: Arc<dyn EthereumCallCache>,
9898
subgraph_store: Arc<dyn SubgraphStore>,
99-
firehose_endpoints: FirehoseNetworkEndpoints,
99+
firehose_endpoints: FirehoseEndpoints,
100100
eth_adapters: EthereumNetworkAdapters,
101101
chain_head_update_listener: Arc<dyn ChainHeadUpdateListener>,
102102
ancestor_count: BlockNumber,

chain/near/src/chain.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use graph::blockchain::BlockchainKind;
22
use graph::cheap_clone::CheapClone;
33
use graph::data::subgraph::UnifiedMappingApiVersion;
4-
use graph::firehose::FirehoseNetworkEndpoints;
4+
use graph::firehose::FirehoseEndpoints;
55
use graph::prelude::StopwatchMetrics;
66
use graph::{
77
anyhow,
@@ -34,7 +34,7 @@ use graph::blockchain::block_stream::BlockStream;
3434
pub struct Chain {
3535
logger_factory: LoggerFactory,
3636
name: String,
37-
firehose_endpoints: Arc<FirehoseNetworkEndpoints>,
37+
firehose_endpoints: Arc<FirehoseEndpoints>,
3838
chain_store: Arc<dyn ChainStore>,
3939
}
4040

@@ -49,7 +49,7 @@ impl Chain {
4949
logger_factory: LoggerFactory,
5050
name: String,
5151
chain_store: Arc<dyn ChainStore>,
52-
firehose_endpoints: FirehoseNetworkEndpoints,
52+
firehose_endpoints: FirehoseEndpoints,
5353
) -> Self {
5454
Chain {
5555
logger_factory,

graph/src/firehose/endpoints.rs

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -141,45 +141,39 @@ impl FirehoseEndpoint {
141141
Ok(block_stream)
142142
}
143143
}
144-
145-
#[derive(Clone, Debug)]
146-
pub struct FirehoseNetworkEndpoint {
147-
endpoint: Arc<FirehoseEndpoint>,
148-
}
149-
150144
#[derive(Clone, Debug)]
151-
pub struct FirehoseNetworkEndpoints {
152-
pub endpoints: Vec<FirehoseNetworkEndpoint>,
153-
}
145+
pub struct FirehoseEndpoints(Vec<Arc<FirehoseEndpoint>>);
154146

155-
impl FirehoseNetworkEndpoints {
147+
impl FirehoseEndpoints {
156148
pub fn new() -> Self {
157-
Self { endpoints: vec![] }
149+
Self(vec![])
158150
}
159151

160152
pub fn len(&self) -> usize {
161-
self.endpoints.len()
153+
self.0.len()
162154
}
163155

164156
pub fn random(&self) -> Option<&Arc<FirehoseEndpoint>> {
165-
if self.endpoints.len() == 0 {
157+
if self.0.len() == 0 {
166158
return None;
167159
}
168160

169161
// Select from the matching adapters randomly
170162
let mut rng = rand::thread_rng();
171-
Some(&self.endpoints.iter().choose(&mut rng).unwrap().endpoint)
163+
Some(&self.0.iter().choose(&mut rng).unwrap())
172164
}
173165

174166
pub fn remove(&mut self, provider: &str) {
175-
self.endpoints
176-
.retain(|network_endpoint| network_endpoint.endpoint.provider != provider);
167+
self.0
168+
.retain(|network_endpoint| network_endpoint.provider != provider);
177169
}
178170
}
179171

180172
#[derive(Clone, Debug)]
181173
pub struct FirehoseNetworks {
182-
pub networks: BTreeMap<String, FirehoseNetworkEndpoints>,
174+
/// networks contains a map from chain id (`near-mainnet`, `near-testnet`, `solana-mainnet`, etc.)
175+
/// to a list of FirehoseEndpoint (type wrapper around `Arc<Vec<FirehoseEndpoint>>`).
176+
pub networks: BTreeMap<String, FirehoseEndpoints>,
183177
}
184178

185179
impl FirehoseNetworks {
@@ -189,32 +183,33 @@ impl FirehoseNetworks {
189183
}
190184
}
191185

192-
pub fn insert(&mut self, name: String, endpoint: Arc<FirehoseEndpoint>) {
193-
let network_endpoints = self
186+
pub fn insert(&mut self, chain_id: String, endpoint: Arc<FirehoseEndpoint>) {
187+
let endpoints = self
194188
.networks
195-
.entry(name)
196-
.or_insert(FirehoseNetworkEndpoints { endpoints: vec![] });
197-
network_endpoints.endpoints.push(FirehoseNetworkEndpoint {
198-
endpoint: endpoint.clone(),
199-
});
189+
.entry(chain_id)
190+
.or_insert(FirehoseEndpoints::new());
191+
192+
endpoints.0.push(endpoint.clone());
200193
}
201194

202-
pub fn remove(&mut self, name: &str, provider: &str) {
203-
if let Some(endpoints) = self.networks.get_mut(name) {
195+
pub fn remove(&mut self, chain_id: &str, provider: &str) {
196+
if let Some(endpoints) = self.networks.get_mut(chain_id) {
204197
endpoints.remove(provider);
205198
}
206199
}
207200

201+
/// Returns a `Vec` of tuples where the first element of the tuple is
202+
/// the chain's id and the second one is an endpoint for this chain.
203+
/// There can be mulitple tuple with the same chain id but with different
204+
/// endpoint where multiple providers exist for a single chain id.
208205
pub fn flatten(&self) -> Vec<(String, Arc<FirehoseEndpoint>)> {
209206
self.networks
210207
.iter()
211-
.flat_map(|(network_name, firehose_endpoints)| {
208+
.flat_map(|(chain_id, firehose_endpoints)| {
212209
firehose_endpoints
213-
.endpoints
210+
.0
214211
.iter()
215-
.map(move |firehose_endpoint| {
216-
(network_name.clone(), firehose_endpoint.endpoint.clone())
217-
})
212+
.map(move |endpoint| (chain_id.clone(), endpoint.clone()))
218213
})
219214
.collect()
220215
}

node/src/chain.rs

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ use std::time::Duration;
2424
#[derive(PartialEq)]
2525
enum ProviderNetworkStatus {
2626
Broken {
27-
network: String,
27+
chain_id: String,
2828
provider: String,
2929
},
3030
Version {
31-
network: String,
31+
chain_id: String,
3232
ident: ChainIdentifier,
3333
},
3434
}
@@ -258,7 +258,7 @@ pub async fn connect_ethereum_networks(
258258
error!(logger, "Connection to provider failed. Not using this provider";
259259
"error" => e.to_string());
260260
ProviderNetworkStatus::Broken {
261-
network,
261+
chain_id: network,
262262
provider: eth_adapter.provider().to_string(),
263263
}
264264
}
@@ -269,7 +269,10 @@ pub async fn connect_ethereum_networks(
269269
"network_version" => &ident.net_version,
270270
"capabilities" => &capabilities
271271
);
272-
ProviderNetworkStatus::Version { network, ident }
272+
ProviderNetworkStatus::Version {
273+
chain_id: network,
274+
ident,
275+
}
273276
}
274277
}
275278
}),
@@ -282,12 +285,14 @@ pub async fn connect_ethereum_networks(
282285
.into_iter()
283286
.fold(HashMap::new(), |mut networks, status| {
284287
match status {
285-
ProviderNetworkStatus::Broken { network, provider } => {
286-
eth_networks.remove(&network, &provider)
287-
}
288-
ProviderNetworkStatus::Version { network, ident } => {
289-
networks.entry(network.to_string()).or_default().push(ident)
290-
}
288+
ProviderNetworkStatus::Broken {
289+
chain_id: network,
290+
provider,
291+
} => eth_networks.remove(&network, &provider),
292+
ProviderNetworkStatus::Version {
293+
chain_id: network,
294+
ident,
295+
} => networks.entry(network.to_string()).or_default().push(ident),
291296
}
292297
networks
293298
});
@@ -316,11 +321,11 @@ where
316321
firehose_networks
317322
.flatten()
318323
.into_iter()
319-
.map(|(network_name, endpoint)| (network_name, endpoint, logger.clone()))
320-
.map(|(network, endpoint, logger)| async move {
324+
.map(|(chain_id, endpoint)| (chain_id, endpoint, logger.clone()))
325+
.map(|(chain_id, endpoint, logger)| async move {
321326
let logger = logger.new(o!("provider" => endpoint.provider.to_string()));
322327
info!(
323-
logger, "Connecting to Firehose to get network identifier";
328+
logger, "Connecting to Firehose to get chain identifier";
324329
"url" => &endpoint.uri,
325330
);
326331
match tokio::time::timeout(
@@ -336,7 +341,7 @@ where
336341
error!(logger, "Connection to provider failed. Not using this provider";
337342
"error" => e.to_string());
338343
ProviderNetworkStatus::Broken {
339-
network,
344+
chain_id,
340345
provider: endpoint.provider.to_string(),
341346
}
342347
}
@@ -353,28 +358,42 @@ where
353358
genesis_block_hash: ptr.hash,
354359
};
355360

356-
ProviderNetworkStatus::Version { network, ident }
361+
ProviderNetworkStatus::Version { chain_id, ident }
357362
}
358363
}
359364
}),
360365
)
361366
.await;
362367

363-
// Group identifiers by network name
368+
// Group identifiers by chain id
364369
let idents: HashMap<String, Vec<ChainIdentifier>> =
365370
statuses
366371
.into_iter()
367372
.fold(HashMap::new(), |mut networks, status| {
368373
match status {
369-
ProviderNetworkStatus::Broken { network, provider } => {
370-
firehose_networks.remove(&network, &provider)
371-
}
372-
ProviderNetworkStatus::Version { network, ident } => {
373-
networks.entry(network.to_string()).or_default().push(ident)
374+
ProviderNetworkStatus::Broken { chain_id, provider } => {
375+
firehose_networks.remove(&chain_id, &provider)
374376
}
377+
ProviderNetworkStatus::Version { chain_id, ident } => networks
378+
.entry(chain_id.to_string())
379+
.or_default()
380+
.push(ident),
375381
}
376382
networks
377383
});
384+
385+
// Clean-up chains with 0 provider
386+
firehose_networks.networks.retain(|chain_id, endpoints| {
387+
if endpoints.len() == 0 {
388+
error!(
389+
logger,
390+
"No non-broken providers available for chain {}; ignoring this chain", chain_id
391+
);
392+
}
393+
394+
endpoints.len() > 0
395+
});
396+
378397
let idents: Vec<_> = idents.into_iter().collect();
379398
(firehose_networks, idents)
380399
}

node/src/main.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
55
use graph::blockchain::{Block as BlockchainBlock, Blockchain, BlockchainKind, BlockchainMap};
66
use graph::components::store::BlockStore;
77
use graph::data::graphql::effort::LoadManager;
8-
use graph::firehose::{FirehoseNetworkEndpoints, FirehoseNetworks};
8+
use graph::firehose::{FirehoseEndpoints, FirehoseNetworks};
99
use graph::log::logger;
1010
use graph::prelude::{IndexNodeServer as _, JsonRpcServer as _, *};
1111
use graph::prometheus::Registry;
@@ -476,7 +476,7 @@ fn ethereum_networks_as_chains(
476476
chain_store.cheap_clone(),
477477
chain_store,
478478
store.subgraph_store(),
479-
firehose_endpoints.map_or_else(|| FirehoseNetworkEndpoints::new(), |v| v.clone()),
479+
firehose_endpoints.map_or_else(|| FirehoseEndpoints::new(), |v| v.clone()),
480480
eth_adapters.clone(),
481481
chain_head_update_listener.clone(),
482482
*ANCESTOR_COUNT,
@@ -505,38 +505,38 @@ fn near_networks_as_chains(
505505
let chains: Vec<_> = firehose_networks
506506
.networks
507507
.iter()
508-
.filter_map(|(network_name, firehose_endpoints)| {
508+
.filter_map(|(chain_id, endpoints)| {
509509
store
510510
.block_store()
511-
.chain_store(network_name)
512-
.map(|chain_store| (network_name, chain_store, firehose_endpoints))
511+
.chain_store(chain_id)
512+
.map(|chain_store| (chain_id, chain_store, endpoints))
513513
.or_else(|| {
514514
error!(
515515
logger,
516-
"No store configured for NEAR chain {}; ignoring this chain", network_name
516+
"No store configured for NEAR chain {}; ignoring this chain", chain_id
517517
);
518518
None
519519
})
520520
})
521-
.map(|(network_name, chain_store, firehose_endpoints)| {
521+
.map(|(chain_id, chain_store, endpoints)| {
522522
(
523-
network_name.clone(),
523+
chain_id.clone(),
524524
FirehoseChain {
525525
chain: Arc::new(near::Chain::new(
526526
logger_factory.clone(),
527-
network_name.clone(),
527+
chain_id.clone(),
528528
chain_store,
529-
firehose_endpoints.clone(),
529+
endpoints.clone(),
530530
)),
531-
firehose_endpoints: firehose_endpoints.clone(),
531+
firehose_endpoints: endpoints.clone(),
532532
},
533533
)
534534
})
535535
.collect();
536536

537-
for (network_name, firehose_chain) in chains.iter() {
537+
for (chain_id, firehose_chain) in chains.iter() {
538538
blockchain_map
539-
.insert::<graph_chain_near::Chain>(network_name.clone(), firehose_chain.chain.clone())
539+
.insert::<graph_chain_near::Chain>(chain_id.clone(), firehose_chain.chain.clone())
540540
}
541541

542542
HashMap::from_iter(chains)
@@ -594,7 +594,7 @@ fn start_block_ingestor(
594594
#[derive(Clone)]
595595
struct FirehoseChain<C: Blockchain> {
596596
chain: Arc<C>,
597-
firehose_endpoints: FirehoseNetworkEndpoints,
597+
firehose_endpoints: FirehoseEndpoints,
598598
}
599599

600600
fn start_firehose_block_ingestor<C, M>(

0 commit comments

Comments
 (0)