Skip to content

Commit a5eedc7

Browse files
authored
Fix null public pois (#4768)
* refactor(status api): refactor resolve_block_hash_from_number * fix(status api): Add timeout to block_ptr_for_number * fix(status api): Fetch block hash on the fly in fn get_public_proof_of_indexing
1 parent ba5f12f commit a5eedc7

File tree

4 files changed

+163
-106
lines changed

4 files changed

+163
-106
lines changed

graph/src/components/store/traits.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,5 +606,15 @@ pub trait StatusStore: Send + Sync + 'static {
606606
&self,
607607
subgraph_id: &DeploymentHash,
608608
block_number: BlockNumber,
609+
fetch_block_ptr: &dyn BlockPtrForNumber,
609610
) -> Result<Option<(PartialBlockPtr, [u8; 32])>, StoreError>;
610611
}
612+
613+
#[async_trait]
614+
pub trait BlockPtrForNumber: Send + Sync {
615+
async fn block_ptr_for_number(
616+
&self,
617+
network: String,
618+
number: BlockNumber,
619+
) -> Result<Option<BlockPtr>, Error>;
620+
}

server/index-node/src/resolver.rs

Lines changed: 117 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use graph::data::query::Trace;
55
use web3::types::Address;
66

77
use graph::blockchain::{Blockchain, BlockchainKind, BlockchainMap};
8-
use graph::components::store::{BlockStore, EntityType, Store};
8+
use graph::components::store::{BlockPtrForNumber, BlockStore, EntityType, Store};
99
use graph::components::versions::VERSIONS;
1010
use graph::data::graphql::{object, IntoValue, ObjectOrInterface, ValueMap};
1111
use graph::data::subgraph::status;
@@ -15,6 +15,9 @@ use graph_graphql::prelude::{a, ExecutionContext, Resolver};
1515

1616
use crate::auth::PoiProtection;
1717

18+
/// Timeout for calls to fetch the block from JSON-RPC or Firehose.
19+
const BLOCK_HASH_FROM_NUMBER_TIMEOUT: Duration = Duration::from_secs(10);
20+
1821
#[derive(Clone, Debug)]
1922
struct PublicProofOfIndexingRequest {
2023
pub deployment: DeploymentHash,
@@ -212,62 +215,10 @@ impl<S: Store> IndexNodeResolver<S> {
212215
.get_required::<BlockNumber>("blockNumber")
213216
.expect("Valid blockNumber required");
214217

215-
macro_rules! try_resolve_for_chain {
216-
( $typ:path ) => {
217-
let blockchain = self.blockchain_map.get::<$typ>(network.to_string()).ok();
218-
219-
if let Some(blockchain) = blockchain {
220-
debug!(
221-
self.logger,
222-
"Fetching block hash from number";
223-
"network" => &network,
224-
"block_number" => block_number,
225-
);
226-
227-
let block_ptr_res = blockchain
228-
.block_pointer_from_number(&self.logger, block_number)
229-
.await;
230-
231-
if let Err(e) = block_ptr_res {
232-
warn!(
233-
self.logger,
234-
"Failed to fetch block hash from number";
235-
"network" => &network,
236-
"chain" => <$typ as Blockchain>::KIND.to_string(),
237-
"block_number" => block_number,
238-
"error" => e.to_string(),
239-
);
240-
return Ok(r::Value::Null);
241-
}
242-
243-
let block_ptr = block_ptr_res.unwrap();
244-
return Ok(r::Value::String(block_ptr.hash_hex()));
245-
}
246-
};
218+
match self.block_ptr_for_number(network, block_number).await? {
219+
Some(block_ptr) => Ok(r::Value::String(block_ptr.hash_hex())),
220+
None => Ok(r::Value::Null),
247221
}
248-
249-
// Ugly, but we can't get back an object trait from the `BlockchainMap`,
250-
// so this seems like the next best thing.
251-
try_resolve_for_chain!(graph_chain_ethereum::Chain);
252-
try_resolve_for_chain!(graph_chain_arweave::Chain);
253-
try_resolve_for_chain!(graph_chain_cosmos::Chain);
254-
try_resolve_for_chain!(graph_chain_near::Chain);
255-
256-
// If you're adding support for a new chain and this `match` clause just
257-
// gave you a compiler error, then this message is for you! You need to
258-
// add a new `try_resolve!` macro invocation above for your new chain
259-
// type.
260-
match BlockchainKind::Ethereum {
261-
// Note: we don't actually care about substreams here.
262-
BlockchainKind::Substreams
263-
| BlockchainKind::Arweave
264-
| BlockchainKind::Ethereum
265-
| BlockchainKind::Cosmos
266-
| BlockchainKind::Near => (),
267-
}
268-
269-
// The given network does not exist.
270-
Ok(r::Value::Null)
271222
}
272223

273224
async fn resolve_cached_ethereum_calls(
@@ -405,7 +356,7 @@ impl<S: Store> IndexNodeResolver<S> {
405356
Ok(poi)
406357
}
407358

408-
fn resolve_public_proofs_of_indexing(
359+
async fn resolve_public_proofs_of_indexing(
409360
&self,
410361
field: &a::Field,
411362
) -> Result<r::Value, QueryExecutionError> {
@@ -420,41 +371,41 @@ impl<S: Store> IndexNodeResolver<S> {
420371
return Err(QueryExecutionError::TooExpensive);
421372
}
422373

423-
Ok(r::Value::List(
424-
requests
425-
.into_iter()
426-
.map(|request| {
427-
match futures::executor::block_on(
428-
self.store.get_public_proof_of_indexing(
429-
&request.deployment,
430-
request.block_number,
431-
),
432-
) {
433-
Ok(Some(poi)) => (Some(poi), request),
434-
Ok(None) => (None, request),
435-
Err(e) => {
436-
error!(
437-
self.logger,
438-
"Failed to query public proof of indexing";
439-
"subgraph" => &request.deployment,
440-
"block" => format!("{}", request.block_number),
441-
"error" => format!("{:?}", e)
442-
);
443-
(None, request)
444-
}
445-
}
446-
})
447-
.map(|(poi_result, request)| PublicProofOfIndexingResult {
374+
let mut public_poi_results = vec![];
375+
for request in requests {
376+
let (poi_result, request) = match self
377+
.store
378+
.get_public_proof_of_indexing(&request.deployment, request.block_number, self)
379+
.await
380+
{
381+
Ok(Some(poi)) => (Some(poi), request),
382+
Ok(None) => (None, request),
383+
Err(e) => {
384+
error!(
385+
self.logger,
386+
"Failed to query public proof of indexing";
387+
"subgraph" => &request.deployment,
388+
"block" => format!("{}", request.block_number),
389+
"error" => format!("{:?}", e)
390+
);
391+
(None, request)
392+
}
393+
};
394+
395+
public_poi_results.push(
396+
PublicProofOfIndexingResult {
448397
deployment: request.deployment,
449398
block: match poi_result {
450399
Some((ref block, _)) => block.clone(),
451400
None => PartialBlockPtr::from(request.block_number),
452401
},
453402
proof_of_indexing: poi_result.map(|(_, poi)| poi),
454-
})
455-
.map(IntoValue::into_value)
456-
.collect(),
457-
))
403+
}
404+
.into_value(),
405+
)
406+
}
407+
408+
Ok(r::Value::List(public_poi_results))
458409
}
459410

460411
fn resolve_indexing_status_for_version(
@@ -517,6 +468,85 @@ impl<S: Store> IndexNodeResolver<S> {
517468
.collect(),
518469
))
519470
}
471+
472+
async fn block_ptr_for_number(
473+
&self,
474+
network: String,
475+
block_number: BlockNumber,
476+
) -> Result<Option<BlockPtr>, QueryExecutionError> {
477+
macro_rules! try_resolve_for_chain {
478+
( $typ:path ) => {
479+
let blockchain = self.blockchain_map.get::<$typ>(network.to_string()).ok();
480+
481+
if let Some(blockchain) = blockchain {
482+
debug!(
483+
self.logger,
484+
"Fetching block hash from number";
485+
"network" => &network,
486+
"block_number" => block_number,
487+
);
488+
489+
let block_ptr_res = tokio::time::timeout(BLOCK_HASH_FROM_NUMBER_TIMEOUT, blockchain
490+
.block_pointer_from_number(&self.logger, block_number)
491+
.map_err(Error::from))
492+
.await
493+
.map_err(Error::from)
494+
.and_then(|x| x);
495+
496+
if let Err(e) = block_ptr_res {
497+
warn!(
498+
self.logger,
499+
"Failed to fetch block hash from number";
500+
"network" => &network,
501+
"chain" => <$typ as Blockchain>::KIND.to_string(),
502+
"block_number" => block_number,
503+
"error" => e.to_string(),
504+
);
505+
return Ok(None);
506+
}
507+
508+
let block_ptr = block_ptr_res.unwrap();
509+
return Ok(Some(block_ptr));
510+
}
511+
};
512+
}
513+
514+
// Ugly, but we can't get back an object trait from the `BlockchainMap`,
515+
// so this seems like the next best thing.
516+
try_resolve_for_chain!(graph_chain_ethereum::Chain);
517+
try_resolve_for_chain!(graph_chain_arweave::Chain);
518+
try_resolve_for_chain!(graph_chain_cosmos::Chain);
519+
try_resolve_for_chain!(graph_chain_near::Chain);
520+
521+
// If you're adding support for a new chain and this `match` clause just
522+
// gave you a compiler error, then this message is for you! You need to
523+
// add a new `try_resolve!` macro invocation above for your new chain
524+
// type.
525+
match BlockchainKind::Ethereum {
526+
// Note: we don't actually care about substreams here.
527+
BlockchainKind::Substreams
528+
| BlockchainKind::Arweave
529+
| BlockchainKind::Ethereum
530+
| BlockchainKind::Cosmos
531+
| BlockchainKind::Near => (),
532+
}
533+
534+
// The given network does not exist.
535+
Ok(None)
536+
}
537+
}
538+
539+
#[async_trait]
540+
impl<S: Store> BlockPtrForNumber for IndexNodeResolver<S> {
541+
async fn block_ptr_for_number(
542+
&self,
543+
network: String,
544+
block_number: BlockNumber,
545+
) -> Result<Option<BlockPtr>, Error> {
546+
self.block_ptr_for_number(network, block_number)
547+
.map_err(Error::from)
548+
.await
549+
}
520550
}
521551

522552
fn entity_changes_to_graphql(entity_changes: Vec<EntityOperation>) -> r::Value {
@@ -643,7 +673,7 @@ impl<S: Store> Resolver for IndexNodeResolver<S> {
643673

644674
// The top-level `publicProofsOfIndexing` field
645675
(None, "PublicProofOfIndexingResult", "publicProofsOfIndexing") => {
646-
self.resolve_public_proofs_of_indexing(field)
676+
self.resolve_public_proofs_of_indexing(field).await
647677
}
648678

649679
// Resolve fields of `Object` values (e.g. the `chains` field of `ChainIndexingStatus`)

store/postgres/src/store.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use graph::{
55
components::{
66
server::index_node::VersionInfo,
77
store::{
8-
BlockStore as BlockStoreTrait, QueryStoreManager, StatusStore, Store as StoreTrait,
8+
BlockPtrForNumber, BlockStore as BlockStoreTrait, QueryStoreManager, StatusStore,
9+
Store as StoreTrait,
910
},
1011
},
1112
constraint_violation,
@@ -155,9 +156,15 @@ impl StatusStore for Store {
155156
&self,
156157
subgraph_id: &DeploymentHash,
157158
block_number: BlockNumber,
159+
fetch_block_ptr: &dyn BlockPtrForNumber,
158160
) -> Result<Option<(PartialBlockPtr, [u8; 32])>, StoreError> {
159161
self.subgraph_store
160-
.get_public_proof_of_indexing(subgraph_id, block_number, self.block_store().clone())
162+
.get_public_proof_of_indexing(
163+
subgraph_id,
164+
block_number,
165+
self.block_store().clone(),
166+
fetch_block_ptr,
167+
)
161168
.await
162169
}
163170

store/postgres/src/subgraph_store.rs

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ use graph::{
1616
components::{
1717
server::index_node::VersionInfo,
1818
store::{
19-
self, BlockStore, DeploymentLocator, EnsLookup as EnsLookupTrait, PruneReporter,
20-
PruneRequest, SubgraphFork,
19+
self, BlockPtrForNumber, BlockStore, DeploymentLocator, EnsLookup as EnsLookupTrait,
20+
PruneReporter, PruneRequest, SubgraphFork,
2121
},
2222
},
2323
constraint_violation,
@@ -243,9 +243,10 @@ impl SubgraphStore {
243243
id: &DeploymentHash,
244244
block_number: BlockNumber,
245245
block_store: Arc<impl BlockStore>,
246+
fetch_block_ptr: &dyn BlockPtrForNumber,
246247
) -> Result<Option<(PartialBlockPtr, [u8; 32])>, StoreError> {
247248
self.inner
248-
.get_public_proof_of_indexing(id, block_number, block_store)
249+
.get_public_proof_of_indexing(id, block_number, block_store, fetch_block_ptr)
249250
.await
250251
}
251252

@@ -990,24 +991,33 @@ impl SubgraphStoreInner {
990991
id: &DeploymentHash,
991992
block_number: BlockNumber,
992993
block_store: Arc<impl BlockStore>,
994+
fetch_block_ptr: &dyn BlockPtrForNumber,
993995
) -> Result<Option<(PartialBlockPtr, [u8; 32])>, StoreError> {
994996
let (store, site) = self.store(id)?;
995997

996-
let chain_store = match block_store.chain_store(&site.network) {
997-
Some(chain_store) => chain_store,
998-
None => return Ok(None),
998+
let block_hash = {
999+
let chain_store = match block_store.chain_store(&site.network) {
1000+
Some(chain_store) => chain_store,
1001+
None => return Ok(None),
1002+
};
1003+
let mut hashes = chain_store.block_hashes_by_block_number(block_number)?;
1004+
1005+
// If we have multiple versions of this block using any of them could introduce
1006+
// non-determinism because we don't know which one is the right one
1007+
if hashes.len() == 1 {
1008+
hashes.pop().unwrap()
1009+
} else {
1010+
match fetch_block_ptr
1011+
.block_ptr_for_number(site.network.clone(), block_number)
1012+
.await
1013+
.ok()
1014+
.flatten()
1015+
{
1016+
None => return Ok(None),
1017+
Some(block_ptr) => block_ptr.hash,
1018+
}
1019+
}
9991020
};
1000-
let mut hashes = chain_store.block_hashes_by_block_number(block_number)?;
1001-
1002-
// If we don't have this block or we have multiple versions of this block
1003-
// and using any of them could introduce non-deterministic because we don't
1004-
// know which one is the right one -> return no block hash
1005-
if hashes.is_empty() || hashes.len() > 1 {
1006-
return Ok(None);
1007-
}
1008-
1009-
// This `unwrap` is safe to do now
1010-
let block_hash = hashes.pop().unwrap();
10111021

10121022
let block_for_poi_query = BlockPtr::new(block_hash.clone(), block_number);
10131023
let indexer = Some(Address::zero());

0 commit comments

Comments
 (0)