Skip to content

Commit 844cff6

Browse files
Matthieu Vachonmaoueh
andauthored
chain/ethereum,graph,node: moved BlockIngestor to chain/ethereum and removed IngestorAdapter (#3131)
The IngestorAdapter was actually protocol specific and having the abstraction in the Blockchain trait was not used expect for Ethereum chain. Removed `IngestorAdapter` from block chain and moved the chain agnostic `BlockIngestor` to `graph-chain-ethereum` crate. Co-authored-by: Matthieu Vachon <matt@streamingfast.io>
1 parent d6ba163 commit 844cff6

File tree

7 files changed

+118
-219
lines changed

7 files changed

+118
-219
lines changed

chain/ethereum/src/chain.rs

Lines changed: 10 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,15 @@ use graph::{
1414
},
1515
firehose_block_stream::FirehoseBlockStream,
1616
polling_block_stream::PollingBlockStream,
17-
Block, BlockHash, BlockPtr, Blockchain, ChainHeadUpdateListener,
18-
IngestorAdapter as IngestorAdapterTrait, IngestorError, TriggerFilter as _,
17+
Block, BlockPtr, Blockchain, ChainHeadUpdateListener, IngestorError, TriggerFilter as _,
1918
},
2019
cheap_clone::CheapClone,
2120
components::store::DeploymentLocator,
2221
firehose,
23-
log::factory::{ComponentLoggerConfig, ElasticComponentLoggerConfig},
2422
prelude::{
25-
async_trait, error, lazy_static, o, serde_json as json, web3::types::H256, BlockNumber,
26-
ChainStore, EthereumBlockWithCalls, Future01CompatExt, Logger, LoggerFactory,
27-
MetricsRegistry, NodeId, SubgraphStore,
23+
async_trait, lazy_static, o, serde_json as json, BlockNumber, ChainStore,
24+
EthereumBlockWithCalls, Future01CompatExt, Logger, LoggerFactory, MetricsRegistry, NodeId,
25+
SubgraphStore,
2826
},
2927
};
3028
use prost::Message;
@@ -72,7 +70,6 @@ pub struct Chain {
7270
registry: Arc<dyn MetricsRegistry>,
7371
firehose_endpoints: Arc<FirehoseEndpoints>,
7472
eth_adapters: Arc<EthereumNetworkAdapters>,
75-
ancestor_count: BlockNumber,
7673
chain_store: Arc<dyn ChainStore>,
7774
call_cache: Arc<dyn EthereumCallCache>,
7875
subgraph_store: Arc<dyn SubgraphStore>,
@@ -99,7 +96,6 @@ impl Chain {
9996
firehose_endpoints: FirehoseEndpoints,
10097
eth_adapters: EthereumNetworkAdapters,
10198
chain_head_update_listener: Arc<dyn ChainHeadUpdateListener>,
102-
ancestor_count: BlockNumber,
10399
reorg_threshold: BlockNumber,
104100
is_ingestible: bool,
105101
) -> Self {
@@ -110,7 +106,6 @@ impl Chain {
110106
registry,
111107
firehose_endpoints: Arc::new(firehose_endpoints),
112108
eth_adapters: Arc::new(eth_adapters),
113-
ancestor_count,
114109
chain_store,
115110
call_cache,
116111
subgraph_store,
@@ -121,6 +116,12 @@ impl Chain {
121116
}
122117
}
123118

119+
impl Chain {
120+
pub fn cheapest_adapter(&self) -> Arc<EthereumAdapter> {
121+
self.eth_adapters.cheapest().unwrap().clone()
122+
}
123+
}
124+
124125
#[async_trait]
125126
impl Blockchain for Chain {
126127
const KIND: BlockchainKind = BlockchainKind::Ethereum;
@@ -145,8 +146,6 @@ impl Blockchain for Chain {
145146

146147
type NodeCapabilities = crate::capabilities::NodeCapabilities;
147148

148-
type IngestorAdapter = IngestorAdapter;
149-
150149
type RuntimeAdapter = RuntimeAdapter;
151150

152151
fn triggers_adapter(
@@ -299,29 +298,6 @@ impl Blockchain for Chain {
299298
)))
300299
}
301300

302-
fn ingestor_adapter(&self) -> Arc<Self::IngestorAdapter> {
303-
let eth_adapter = self.eth_adapters.cheapest().unwrap().clone();
304-
let logger = self
305-
.logger_factory
306-
.component_logger(
307-
"BlockIngestor",
308-
Some(ComponentLoggerConfig {
309-
elastic: Some(ElasticComponentLoggerConfig {
310-
index: String::from("block-ingestor-logs"),
311-
}),
312-
}),
313-
)
314-
.new(o!("provider" => eth_adapter.provider().to_string()));
315-
316-
let adapter = IngestorAdapter {
317-
eth_adapter,
318-
logger,
319-
ancestor_count: self.ancestor_count,
320-
chain_store: self.chain_store.clone(),
321-
};
322-
Arc::new(adapter)
323-
}
324-
325301
fn chain_store(&self) -> Arc<dyn ChainStore> {
326302
self.chain_store.clone()
327303
}
@@ -624,79 +600,3 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
624600
}
625601
}
626602
}
627-
628-
pub struct IngestorAdapter {
629-
logger: Logger,
630-
ancestor_count: i32,
631-
eth_adapter: Arc<EthereumAdapter>,
632-
chain_store: Arc<dyn ChainStore>,
633-
}
634-
635-
#[async_trait]
636-
impl IngestorAdapterTrait<Chain> for IngestorAdapter {
637-
fn logger(&self) -> &Logger {
638-
&self.logger
639-
}
640-
641-
fn ancestor_count(&self) -> BlockNumber {
642-
self.ancestor_count
643-
}
644-
645-
async fn latest_block(&self) -> Result<BlockPtr, IngestorError> {
646-
self.eth_adapter
647-
.latest_block_header(&self.logger)
648-
.compat()
649-
.await
650-
.map(|block| block.into())
651-
}
652-
653-
async fn ingest_block(
654-
&self,
655-
block_hash: &BlockHash,
656-
) -> Result<Option<BlockHash>, IngestorError> {
657-
// TODO: H256::from_slice can panic
658-
let block_hash = H256::from_slice(block_hash.as_slice());
659-
660-
// Get the fully populated block
661-
let block = self
662-
.eth_adapter
663-
.block_by_hash(&self.logger, block_hash)
664-
.compat()
665-
.await?
666-
.ok_or_else(|| IngestorError::BlockUnavailable(block_hash))?;
667-
let ethereum_block = self
668-
.eth_adapter
669-
.load_full_block(&self.logger, block)
670-
.await?;
671-
672-
// We need something that implements `Block` to store the block; the
673-
// store does not care whether the block is final or not
674-
let ethereum_block = BlockFinality::NonFinal(EthereumBlockWithCalls {
675-
ethereum_block,
676-
calls: None,
677-
});
678-
679-
// Store it in the database and try to advance the chain head pointer
680-
self.chain_store
681-
.upsert_block(Arc::new(ethereum_block))
682-
.await?;
683-
684-
self.chain_store
685-
.cheap_clone()
686-
.attempt_chain_head_update(self.ancestor_count)
687-
.await
688-
.map(|missing| missing.map(|h256| h256.into()))
689-
.map_err(|e| {
690-
error!(self.logger, "failed to update chain head");
691-
IngestorError::Unknown(e)
692-
})
693-
}
694-
695-
fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
696-
self.chain_store.chain_head_ptr()
697-
}
698-
699-
fn cleanup_cached_blocks(&self) -> Result<Option<(i32, usize)>, Error> {
700-
self.chain_store.cleanup_cached_blocks(self.ancestor_count)
701-
}
702-
}

graph/src/blockchain/block_ingestor.rs renamed to chain/ethereum/src/ingestor.rs

Lines changed: 78 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1-
use std::{sync::Arc, time::Duration};
2-
3-
use crate::{
4-
blockchain::{Blockchain, IngestorAdapter, IngestorError},
5-
prelude::{info, lazy_static, tokio, trace, warn, Error, LogCode, Logger},
1+
use crate::{chain::BlockFinality, EthereumAdapter, EthereumAdapterTrait};
2+
use graph::{
3+
blockchain::{BlockHash, BlockPtr, IngestorError},
4+
cheap_clone::CheapClone,
5+
prelude::{
6+
error, ethabi::ethereum_types::H256, info, lazy_static, tokio, trace, warn, ChainStore,
7+
Error, EthereumBlockWithCalls, Future01CompatExt, LogCode, Logger,
8+
},
69
};
10+
use std::{sync::Arc, time::Duration};
711

812
lazy_static! {
913
// graph_node::config disallows setting this in a store with multiple
@@ -14,27 +18,27 @@ lazy_static! {
1418
.unwrap_or(false);
1519
}
1620

17-
pub struct BlockIngestor<C>
18-
where
19-
C: Blockchain,
20-
{
21-
adapter: Arc<C::IngestorAdapter>,
21+
pub struct BlockIngestor {
2222
logger: Logger,
23+
ancestor_count: i32,
24+
eth_adapter: Arc<EthereumAdapter>,
25+
chain_store: Arc<dyn ChainStore>,
2326
polling_interval: Duration,
2427
}
2528

26-
impl<C> BlockIngestor<C>
27-
where
28-
C: Blockchain,
29-
{
29+
impl BlockIngestor {
3030
pub fn new(
31-
adapter: Arc<C::IngestorAdapter>,
31+
logger: Logger,
32+
ancestor_count: i32,
33+
eth_adapter: Arc<EthereumAdapter>,
34+
chain_store: Arc<dyn ChainStore>,
3235
polling_interval: Duration,
33-
) -> Result<BlockIngestor<C>, Error> {
34-
let logger = adapter.logger().clone();
36+
) -> Result<BlockIngestor, Error> {
3537
Ok(BlockIngestor {
36-
adapter,
3738
logger,
39+
ancestor_count,
40+
eth_adapter,
41+
chain_store,
3842
polling_interval,
3943
})
4044
}
@@ -73,7 +77,7 @@ where
7377
}
7478

7579
fn cleanup_cached_blocks(&self) {
76-
match self.adapter.cleanup_cached_blocks() {
80+
match self.chain_store.cleanup_cached_blocks(self.ancestor_count) {
7781
Ok(Some((min_block, count))) => {
7882
if count > 0 {
7983
info!(
@@ -97,12 +101,12 @@ where
97101
trace!(self.logger, "BlockIngestor::do_poll");
98102

99103
// Get chain head ptr from store
100-
let head_block_ptr_opt = self.adapter.chain_head_ptr()?;
104+
let head_block_ptr_opt = self.chain_store.chain_head_ptr()?;
101105

102106
// To check if there is a new block or not, fetch only the block header since that's cheaper
103107
// than the full block. This is worthwhile because most of the time there won't be a new
104108
// block, as we expect the poll interval to be much shorter than the block time.
105-
let latest_block = self.adapter.latest_block().await?;
109+
let latest_block = self.latest_block().await?;
106110

107111
// If latest block matches head block in store, nothing needs to be done
108112
if Some(&latest_block) == head_block_ptr_opt.as_ref() {
@@ -122,7 +126,7 @@ where
122126
let latest_number = latest_block.number;
123127
let head_number = head_block_ptr.number;
124128
let distance = latest_number - head_number;
125-
let blocks_needed = (distance).min(self.adapter.ancestor_count());
129+
let blocks_needed = (distance).min(self.ancestor_count);
126130
let code = if distance >= 15 {
127131
LogCode::BlockIngestionLagging
128132
} else {
@@ -147,7 +151,7 @@ where
147151
// Might be a no-op if latest block is one that we have seen.
148152
// ingest_blocks will return a (potentially incomplete) list of blocks that are
149153
// missing.
150-
let mut missing_block_hash = self.adapter.ingest_block(&latest_block.hash).await?;
154+
let mut missing_block_hash = self.ingest_block(&latest_block.hash).await?;
151155

152156
// Repeatedly fetch missing parent blocks, and ingest them.
153157
// ingest_blocks will continue to tell us about more missing parent
@@ -168,8 +172,58 @@ where
168172
// iteration will have at most block number N-1.
169173
// - Therefore, the loop will iterate at most ancestor_count times.
170174
while let Some(hash) = missing_block_hash {
171-
missing_block_hash = self.adapter.ingest_block(&hash).await?;
175+
missing_block_hash = self.ingest_block(&hash).await?;
172176
}
173177
Ok(())
174178
}
179+
180+
async fn ingest_block(
181+
&self,
182+
block_hash: &BlockHash,
183+
) -> Result<Option<BlockHash>, IngestorError> {
184+
// TODO: H256::from_slice can panic
185+
let block_hash = H256::from_slice(block_hash.as_slice());
186+
187+
// Get the fully populated block
188+
let block = self
189+
.eth_adapter
190+
.block_by_hash(&self.logger, block_hash)
191+
.compat()
192+
.await?
193+
.ok_or_else(|| IngestorError::BlockUnavailable(block_hash))?;
194+
let ethereum_block = self
195+
.eth_adapter
196+
.load_full_block(&self.logger, block)
197+
.await?;
198+
199+
// We need something that implements `Block` to store the block; the
200+
// store does not care whether the block is final or not
201+
let ethereum_block = BlockFinality::NonFinal(EthereumBlockWithCalls {
202+
ethereum_block,
203+
calls: None,
204+
});
205+
206+
// Store it in the database and try to advance the chain head pointer
207+
self.chain_store
208+
.upsert_block(Arc::new(ethereum_block))
209+
.await?;
210+
211+
self.chain_store
212+
.cheap_clone()
213+
.attempt_chain_head_update(self.ancestor_count)
214+
.await
215+
.map(|missing| missing.map(|h256| h256.into()))
216+
.map_err(|e| {
217+
error!(self.logger, "failed to update chain head");
218+
IngestorError::Unknown(e)
219+
})
220+
}
221+
222+
async fn latest_block(&self) -> Result<BlockPtr, IngestorError> {
223+
self.eth_adapter
224+
.latest_block_header(&self.logger)
225+
.compat()
226+
.await
227+
.map(|block| block.into())
228+
}
175229
}

chain/ethereum/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mod capabilities;
33
pub mod codec;
44
mod data_source;
55
mod ethereum_adapter;
6+
mod ingestor;
67
pub mod runtime;
78
mod transport;
89

@@ -26,6 +27,7 @@ pub use crate::adapter::{
2627
};
2728
pub use crate::chain::Chain;
2829
pub use crate::network::EthereumNetworks;
30+
pub use ingestor::{BlockIngestor, CLEANUP_BLOCKS};
2931

3032
#[cfg(test)]
3133
mod tests;

0 commit comments

Comments
 (0)