Skip to content

Commit 5c22a5c

Browse files
authored
all: Refactor adapter handle, BlockStream, BlockIngestor (#4411)
- Move BlockIngestor init done by Blockchain trait - BlockIngestor is now a trait - BlockStream init now done by Blockchain trait - Adapter not sticky on BlockIngestor and BlockStream
1 parent a60d00d commit 5c22a5c

File tree

26 files changed

+517
-567
lines changed

26 files changed

+517
-567
lines changed

chain/arweave/src/chain.rs

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
use graph::anyhow;
12
use graph::blockchain::client::ChainClient;
3+
use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
24
use graph::blockchain::{
3-
BasicBlockchainBuilder, Block, BlockchainBuilder, BlockchainKind, EmptyNodeCapabilities,
4-
NoopRuntimeAdapter,
5+
BasicBlockchainBuilder, Block, BlockIngestor, BlockchainBuilder, BlockchainKind,
6+
EmptyNodeCapabilities, NoopRuntimeAdapter,
57
};
68
use graph::cheap_clone::CheapClone;
9+
use graph::components::store::DeploymentCursorTracker;
710
use graph::data::subgraph::UnifiedMappingApiVersion;
811
use graph::firehose::FirehoseEndpoint;
912
use graph::prelude::MetricsRegistry;
@@ -103,12 +106,11 @@ impl Blockchain for Chain {
103106
unimplemented!("This chain does not support Dynamic Data Sources. is_refetch_block_required always returns false, this shouldn't be called.")
104107
}
105108

106-
async fn new_firehose_block_stream(
109+
async fn new_block_stream(
107110
&self,
108111
deployment: DeploymentLocator,
109-
block_cursor: FirehoseCursor,
112+
store: impl DeploymentCursorTracker,
110113
start_blocks: Vec<BlockNumber>,
111-
subgraph_current_block: Option<BlockPtr>,
112114
filter: Arc<Self::TriggerFilter>,
113115
unified_api_version: UnifiedMappingApiVersion,
114116
) -> Result<Box<dyn BlockStream<Self>>, Error> {
@@ -120,7 +122,6 @@ impl Blockchain for Chain {
120122
)
121123
.unwrap_or_else(|_| panic!("no adapter for network {}", self.name));
122124

123-
let firehose_endpoint = self.client.firehose_endpoint()?;
124125
let logger = self
125126
.logger_factory
126127
.subgraph_logger(&deployment)
@@ -130,9 +131,9 @@ impl Blockchain for Chain {
130131

131132
Ok(Box::new(FirehoseBlockStream::new(
132133
deployment.hash,
133-
firehose_endpoint,
134-
subgraph_current_block,
135-
block_cursor,
134+
self.chain_client(),
135+
store.block_ptr(),
136+
store.firehose_cursor(),
136137
firehose_mapper,
137138
adapter,
138139
filter,
@@ -142,17 +143,6 @@ impl Blockchain for Chain {
142143
)))
143144
}
144145

145-
async fn new_polling_block_stream(
146-
&self,
147-
_deployment: DeploymentLocator,
148-
_start_blocks: Vec<BlockNumber>,
149-
_subgraph_current_block: Option<BlockPtr>,
150-
_filter: Arc<Self::TriggerFilter>,
151-
_unified_api_version: UnifiedMappingApiVersion,
152-
) -> Result<Box<dyn BlockStream<Self>>, Error> {
153-
panic!("Arweave does not support polling block stream")
154-
}
155-
156146
fn chain_store(&self) -> Arc<dyn ChainStore> {
157147
self.chain_store.clone()
158148
}
@@ -176,6 +166,17 @@ impl Blockchain for Chain {
176166
fn chain_client(&self) -> Arc<ChainClient<Self>> {
177167
self.client.clone()
178168
}
169+
170+
fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
171+
let ingestor = FirehoseBlockIngestor::<crate::Block, Self>::new(
172+
self.chain_store.cheap_clone(),
173+
self.chain_client(),
174+
self.logger_factory
175+
.component_logger("ArweaveFirehoseBlockIngestor", None),
176+
self.name.clone(),
177+
);
178+
Ok(Box::new(ingestor))
179+
}
179180
}
180181

181182
pub struct TriggersAdapter {}

chain/cosmos/src/chain.rs

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
2+
use graph::blockchain::BlockIngestor;
13
use std::sync::Arc;
24

35
use graph::blockchain::block_stream::FirehoseCursor;
46
use graph::blockchain::client::ChainClient;
57
use graph::blockchain::{BasicBlockchainBuilder, BlockchainBuilder, NoopRuntimeAdapter};
68
use graph::cheap_clone::CheapClone;
9+
use graph::components::store::DeploymentCursorTracker;
710
use graph::data::subgraph::UnifiedMappingApiVersion;
811
use graph::prelude::MetricsRegistry;
912
use graph::{
@@ -98,12 +101,11 @@ impl Blockchain for Chain {
98101
Ok(Arc::new(adapter))
99102
}
100103

101-
async fn new_firehose_block_stream(
104+
async fn new_block_stream(
102105
&self,
103106
deployment: DeploymentLocator,
104-
block_cursor: FirehoseCursor,
107+
store: impl DeploymentCursorTracker,
105108
start_blocks: Vec<BlockNumber>,
106-
subgraph_current_block: Option<BlockPtr>,
107109
filter: Arc<Self::TriggerFilter>,
108110
unified_api_version: UnifiedMappingApiVersion,
109111
) -> Result<Box<dyn BlockStream<Self>>, Error> {
@@ -115,8 +117,6 @@ impl Blockchain for Chain {
115117
)
116118
.unwrap_or_else(|_| panic!("no adapter for network {}", self.name));
117119

118-
let firehose_endpoint = self.client.firehose_endpoint()?;
119-
120120
let logger = self
121121
.logger_factory
122122
.subgraph_logger(&deployment)
@@ -126,9 +126,9 @@ impl Blockchain for Chain {
126126

127127
Ok(Box::new(FirehoseBlockStream::new(
128128
deployment.hash,
129-
firehose_endpoint,
130-
subgraph_current_block,
131-
block_cursor,
129+
self.chain_client(),
130+
store.block_ptr(),
131+
store.firehose_cursor(),
132132
firehose_mapper,
133133
adapter,
134134
filter,
@@ -138,17 +138,6 @@ impl Blockchain for Chain {
138138
)))
139139
}
140140

141-
async fn new_polling_block_stream(
142-
&self,
143-
_deployment: DeploymentLocator,
144-
_start_blocks: Vec<BlockNumber>,
145-
_subgraph_start_block: Option<BlockPtr>,
146-
_filter: Arc<Self::TriggerFilter>,
147-
_unified_api_version: UnifiedMappingApiVersion,
148-
) -> Result<Box<dyn BlockStream<Self>>, Error> {
149-
panic!("Cosmos does not support polling block stream")
150-
}
151-
152141
fn chain_store(&self) -> Arc<dyn ChainStore> {
153142
self.chain_store.cheap_clone()
154143
}
@@ -173,6 +162,17 @@ impl Blockchain for Chain {
173162
fn chain_client(&self) -> Arc<ChainClient<Self>> {
174163
self.client.clone()
175164
}
165+
166+
fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
167+
let ingestor = FirehoseBlockIngestor::<crate::Block, Self>::new(
168+
self.chain_store.cheap_clone(),
169+
self.chain_client(),
170+
self.logger_factory
171+
.component_logger("CosmosFirehoseBlockIngestor", None),
172+
self.name.clone(),
173+
);
174+
Ok(Box::new(ingestor))
175+
}
176176
}
177177

178178
pub struct TriggersAdapter {}

0 commit comments

Comments
 (0)