Skip to content

Commit a0b1c87

Browse files
committed
refactor: generalize network subgraph into a subgraph client
This way we can also use it for the escrow subgraph and others that we may need in the future.
1 parent 5ee3d86 commit a0b1c87

File tree

6 files changed

+71
-58
lines changed

6 files changed

+71
-58
lines changed

common/src/allocations/monitor.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@ use serde::Deserialize;
1111
use serde_json::json;
1212
use tokio::time::sleep;
1313

14-
use crate::prelude::NetworkSubgraph;
14+
use crate::prelude::SubgraphClient;
1515

1616
use super::Allocation;
1717

1818
async fn current_epoch(
19-
network_subgraph: &'static NetworkSubgraph,
19+
network_subgraph: &'static SubgraphClient,
2020
graph_network_id: u64,
2121
) -> Result<u64, anyhow::Error> {
2222
// Types for deserializing the network subgraph response
@@ -63,7 +63,7 @@ async fn current_epoch(
6363

6464
/// An always up-to-date list of an indexer's active and recently closed allocations.
6565
pub fn indexer_allocations(
66-
network_subgraph: &'static NetworkSubgraph,
66+
network_subgraph: &'static SubgraphClient,
6767
indexer_address: Address,
6868
graph_network_id: u64,
6969
interval: Duration,
@@ -195,22 +195,24 @@ mod test {
195195
Mock, MockServer, ResponseTemplate,
196196
};
197197

198-
use crate::{prelude::NetworkSubgraph, test_vectors};
198+
use crate::{prelude::SubgraphClient, test_vectors};
199199

200200
use super::*;
201201

202-
async fn setup_mock_network_subgraph() -> (&'static NetworkSubgraph, MockServer) {
202+
async fn setup_mock_network_subgraph() -> (&'static SubgraphClient, MockServer) {
203203
// Set up a mock network subgraph
204204
let mock_server = MockServer::start().await;
205-
let network_subgraph_endpoint = NetworkSubgraph::local_deployment_endpoint(
205+
let network_subgraph_endpoint = SubgraphClient::local_deployment_endpoint(
206206
&mock_server.uri(),
207207
&test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT,
208-
);
209-
let network_subgraph = NetworkSubgraph::new(
208+
)
209+
.unwrap();
210+
let network_subgraph = SubgraphClient::new(
210211
Some(&mock_server.uri()),
211212
Some(&test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT),
212213
network_subgraph_endpoint.as_ref(),
213-
);
214+
)
215+
.unwrap();
214216

215217
// Mock result for current epoch requests
216218
mock_server

common/src/attestations/dispute_manager.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ use serde::Deserialize;
1010
use serde_json::json;
1111
use tokio::time::sleep;
1212

13-
use crate::network_subgraph::NetworkSubgraph;
13+
use crate::subgraph_client::SubgraphClient;
1414

1515
pub fn dispute_manager(
16-
network_subgraph: &'static NetworkSubgraph,
16+
network_subgraph: &'static SubgraphClient,
1717
graph_network_id: u64,
1818
interval: Duration,
1919
) -> Eventual<Address> {
@@ -88,24 +88,26 @@ mod test {
8888
};
8989

9090
use crate::{
91-
prelude::NetworkSubgraph,
91+
prelude::SubgraphClient,
9292
test_vectors::{self, DISPUTE_MANAGER_ADDRESS},
9393
};
9494

9595
use super::*;
9696

97-
async fn setup_mock_network_subgraph() -> (&'static NetworkSubgraph, MockServer) {
97+
async fn setup_mock_network_subgraph() -> (&'static SubgraphClient, MockServer) {
9898
// Set up a mock network subgraph
9999
let mock_server = MockServer::start().await;
100-
let network_subgraph_endpoint = NetworkSubgraph::local_deployment_endpoint(
100+
let network_subgraph_endpoint = SubgraphClient::local_deployment_endpoint(
101101
&mock_server.uri(),
102102
&test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT,
103-
);
104-
let network_subgraph = NetworkSubgraph::new(
103+
)
104+
.unwrap();
105+
let network_subgraph = SubgraphClient::new(
105106
Some(&mock_server.uri()),
106107
Some(&test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT),
107108
network_subgraph_endpoint.as_ref(),
108-
);
109+
)
110+
.unwrap();
109111

110112
// Mock result for current epoch requests
111113
mock_server

common/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
pub mod allocations;
55
pub mod attestations;
66
pub mod graphql;
7-
pub mod network_subgraph;
87
pub mod signature_verification;
8+
pub mod subgraph_client;
99

1010
#[cfg(test)]
1111
mod test_vectors;
@@ -17,5 +17,5 @@ pub mod prelude {
1717
pub use super::attestations::{
1818
dispute_manager::dispute_manager, signer::AttestationSigner, signers::attestation_signers,
1919
};
20-
pub use super::network_subgraph::NetworkSubgraph;
20+
pub use super::subgraph_client::SubgraphClient;
2121
}

common/src/network_subgraph/mod.rs renamed to common/src/subgraph_client.rs

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
use std::sync::Arc;
55

6+
use anyhow::anyhow;
67
use graphql::http::Response;
78
use reqwest::{header, Client, Url};
89
use serde::de::Deserialize;
@@ -13,56 +14,60 @@ use toolshed::thegraph::DeploymentId;
1314
///
1415
/// This is Arc internally, so it can be cloned and shared between threads.
1516
#[derive(Debug, Clone)]
16-
pub struct NetworkSubgraph {
17+
pub struct SubgraphClient {
1718
client: Client, // it is Arc
18-
network_subgraph_url: Arc<Url>,
19+
subgraph_url: Arc<Url>,
1920
}
2021

21-
impl NetworkSubgraph {
22+
impl SubgraphClient {
2223
pub fn new(
2324
graph_node_query_endpoint: Option<&str>,
2425
deployment: Option<&DeploymentId>,
25-
network_subgraph_url: &str,
26-
) -> NetworkSubgraph {
27-
//TODO: Check indexing status of the local network subgraph deployment
28-
//if the deployment is healthy and synced, use local_network_subgraph_endpoint
29-
let _local_network_subgraph_endpoint = match (graph_node_query_endpoint, deployment) {
30-
(Some(endpoint), Some(id)) => {
31-
Some(NetworkSubgraph::local_deployment_endpoint(endpoint, id))
32-
}
26+
subgraph_url: &str,
27+
) -> Result<Self, anyhow::Error> {
28+
// TODO: Check indexing status of the local subgraph deployment
29+
// if the deployment is healthy and synced, use local_subgraoh_endpoint
30+
let _local_subgraph_endpoint = match (graph_node_query_endpoint, deployment) {
31+
(Some(endpoint), Some(id)) => Some(Self::local_deployment_endpoint(endpoint, id)?),
3332
_ => None,
3433
};
3534

36-
let network_subgraph_url =
37-
Url::parse(network_subgraph_url).expect("Could not parse network subgraph url");
35+
let subgraph_url = Url::parse(subgraph_url)
36+
.map_err(|e| anyhow!("Could not parse subgraph url `{}`: {}", subgraph_url, e))?;
3837

3938
let client = reqwest::Client::builder()
40-
.user_agent("indexer-service")
39+
.user_agent("indexer-common")
4140
.build()
42-
.expect("Could not build a client to graph node query endpoint");
41+
.expect("Could not build a client for the Graph Node query endpoint");
4342

44-
NetworkSubgraph {
43+
Ok(Self {
4544
client,
46-
network_subgraph_url: Arc::new(network_subgraph_url),
47-
}
45+
subgraph_url: Arc::new(subgraph_url),
46+
})
4847
}
4948

5049
pub fn local_deployment_endpoint(
5150
graph_node_query_endpoint: &str,
5251
deployment: &DeploymentId,
53-
) -> Url {
52+
) -> Result<Url, anyhow::Error> {
5453
Url::parse(graph_node_query_endpoint)
5554
.and_then(|u| u.join("/subgraphs/id/"))
5655
.and_then(|u| u.join(&deployment.to_string()))
57-
.expect("Could not parse graph node query endpoint for the network subgraph deployment")
56+
.map_err(|e| {
57+
anyhow!(
58+
"Could not parse Graph Node query endpoint for subgraph deployment `{}`: {}",
59+
deployment,
60+
e
61+
)
62+
})
5863
}
5964

6065
pub async fn query<T: for<'de> Deserialize<'de>>(
6166
&self,
6267
body: &Value,
6368
) -> Result<Response<T>, reqwest::Error> {
6469
self.client
65-
.post(Url::clone(&self.network_subgraph_url))
70+
.post(Url::clone(&self.subgraph_url))
6671
.json(body)
6772
.header(header::CONTENT_TYPE, "application/json")
6873
.send()
@@ -111,20 +116,21 @@ mod test {
111116
mock_server
112117
}
113118

114-
fn network_subgraph() -> NetworkSubgraph {
115-
NetworkSubgraph::new(
119+
fn network_subgraph_client() -> SubgraphClient {
120+
SubgraphClient::new(
116121
Some(GRAPH_NODE_STATUS_ENDPOINT),
117122
Some(&test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT),
118123
NETWORK_SUBGRAPH_URL,
119124
)
125+
.unwrap()
120126
}
121127

122128
#[tokio::test]
123129
async fn test_network_query() {
124130
let _mock_server = mock_graph_node_server().await;
125131

126132
// Check that the response is valid JSON
127-
let result = network_subgraph()
133+
let result = network_subgraph_client()
128134
.query::<Value>(&json!({
129135
"query": r#"
130136
query {

service/src/main.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use toolshed::thegraph::DeploymentId;
1010
use tracing::info;
1111

1212
use indexer_common::prelude::{
13-
attestation_signers, dispute_manager, indexer_allocations, NetworkSubgraph,
13+
attestation_signers, dispute_manager, indexer_allocations, SubgraphClient,
1414
};
1515

1616
use util::{package_version, shutdown_signal};
@@ -68,17 +68,20 @@ async fn main() -> Result<(), std::io::Error> {
6868
// a static lifetime, which avoids having to pass around and clone `Arc`
6969
// objects everywhere. Since the network subgraph is read-only, this is
7070
// no problem.
71-
let network_subgraph = Box::leak(Box::new(NetworkSubgraph::new(
72-
Some(&config.indexer_infrastructure.graph_node_query_endpoint),
73-
config
74-
.network_subgraph
75-
.network_subgraph_deployment
76-
.map(|s| DeploymentId::from_str(&s))
77-
.transpose()
78-
.expect("Failed to parse invalid network subgraph deployment")
79-
.as_ref(),
80-
&config.network_subgraph.network_subgraph_endpoint,
81-
)));
71+
let network_subgraph = Box::leak(Box::new(
72+
SubgraphClient::new(
73+
Some(&config.indexer_infrastructure.graph_node_query_endpoint),
74+
config
75+
.network_subgraph
76+
.network_subgraph_deployment
77+
.map(|s| DeploymentId::from_str(&s))
78+
.transpose()
79+
.expect("Failed to parse invalid network subgraph deployment")
80+
.as_ref(),
81+
&config.network_subgraph.network_subgraph_endpoint,
82+
)
83+
.expect("Failed to set up network subgraph client"),
84+
));
8285

8386
let indexer_allocations = indexer_allocations(
8487
network_subgraph,

service/src/server/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use tower_http::{
1919
};
2020
use tracing::Level;
2121

22-
use indexer_common::prelude::NetworkSubgraph;
22+
use indexer_common::prelude::SubgraphClient;
2323

2424
use crate::{
2525
query_processor::QueryProcessor,
@@ -38,7 +38,7 @@ pub struct ServerOptions {
3838
pub graph_node_status_endpoint: String,
3939
pub indexer_management_db: PgPool,
4040
pub operator_public_key: String,
41-
pub network_subgraph: &'static NetworkSubgraph,
41+
pub network_subgraph: &'static SubgraphClient,
4242
pub network_subgraph_auth_token: Option<String>,
4343
pub serve_network_subgraph: bool,
4444
}
@@ -53,7 +53,7 @@ impl ServerOptions {
5353
graph_node_status_endpoint: String,
5454
indexer_management_db: PgPool,
5555
operator_public_key: String,
56-
network_subgraph: &'static NetworkSubgraph,
56+
network_subgraph: &'static SubgraphClient,
5757
network_subgraph_auth_token: Option<String>,
5858
serve_network_subgraph: bool,
5959
) -> Self {

0 commit comments

Comments
 (0)