Skip to content

Commit e3168ec

Browse files
authored
Improve subgraph running count metrics (#4401)
- deployment_count now shows the absolute value of deployed subgraphs for the node - deployment_running_count now shows the count of running singraphs which decreases when a subgraph is retrying
1 parent 860becc commit e3168ec

File tree

7 files changed

+77
-27
lines changed

7 files changed

+77
-27
lines changed

core/src/subgraph/context.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,30 +26,26 @@ use self::instance::SubgraphInstance;
2626
#[derive(Clone, Debug)]
2727
pub struct SubgraphKeepAlive {
2828
alive_map: Arc<RwLock<HashMap<DeploymentId, CancelGuard>>>,
29-
manager_metrics: Arc<SubgraphCountMetric>,
29+
sg_metrics: Arc<SubgraphCountMetric>,
3030
}
3131

32-
impl CheapClone for SubgraphKeepAlive {
33-
fn cheap_clone(&self) -> Self {
34-
self.clone()
35-
}
36-
}
32+
impl CheapClone for SubgraphKeepAlive {}
3733

3834
impl SubgraphKeepAlive {
39-
pub fn new(metrics_registry: Arc<dyn MetricsRegistry>) -> Self {
35+
pub fn new(sg_metrics: Arc<SubgraphCountMetric>) -> Self {
4036
Self {
41-
manager_metrics: Arc::new(SubgraphCountMetric::new(metrics_registry)),
37+
sg_metrics,
4238
alive_map: Arc::new(RwLock::new(HashMap::default())),
4339
}
4440
}
4541

4642
pub fn remove(&self, deployment_id: &DeploymentId) {
4743
self.alive_map.write().unwrap().remove(deployment_id);
48-
self.manager_metrics.subgraph_count.dec();
44+
self.sg_metrics.running_count.dec();
4945
}
5046
pub fn insert(&self, deployment_id: DeploymentId, guard: CancelGuard) {
5147
self.alive_map.write().unwrap().insert(deployment_id, guard);
52-
self.manager_metrics.subgraph_count.inc();
48+
self.sg_metrics.running_count.inc();
5349
}
5450
}
5551

core/src/subgraph/instance_manager.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
161161
env_vars: Arc<EnvVars>,
162162
subgraph_store: Arc<S>,
163163
chains: Arc<BlockchainMap>,
164+
sg_metrics: Arc<SubgraphCountMetric>,
164165
metrics_registry: Arc<dyn MetricsRegistry>,
165166
link_resolver: Arc<dyn LinkResolver>,
166167
ipfs_service: IpfsService,
@@ -174,7 +175,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
174175
subgraph_store,
175176
chains,
176177
metrics_registry: metrics_registry.cheap_clone(),
177-
instances: SubgraphKeepAlive::new(metrics_registry),
178+
instances: SubgraphKeepAlive::new(sg_metrics),
178179
link_resolver,
179180
ipfs_service,
180181
static_filters,

core/src/subgraph/provider.rs

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,42 @@ use graph::{
88
prelude::{SubgraphAssignmentProvider as SubgraphAssignmentProviderTrait, *},
99
};
1010

11+
#[derive(Debug)]
12+
struct DeploymentRegistry {
13+
subgraphs_deployed: Arc<Mutex<HashSet<DeploymentId>>>,
14+
subgraph_metrics: Arc<SubgraphCountMetric>,
15+
}
16+
17+
impl DeploymentRegistry {
18+
fn new(subgraph_metrics: Arc<SubgraphCountMetric>) -> Self {
19+
Self {
20+
subgraphs_deployed: Arc::new(Mutex::new(HashSet::new())),
21+
subgraph_metrics,
22+
}
23+
}
24+
25+
fn insert(&self, id: DeploymentId) -> bool {
26+
if !self.subgraphs_deployed.lock().unwrap().insert(id) {
27+
return false;
28+
}
29+
30+
self.subgraph_metrics.deployment_count.inc();
31+
true
32+
}
33+
34+
fn remove(&self, id: &DeploymentId) -> bool {
35+
if !self.subgraphs_deployed.lock().unwrap().remove(id) {
36+
return false;
37+
}
38+
39+
self.subgraph_metrics.deployment_count.dec();
40+
true
41+
}
42+
}
43+
1144
pub struct SubgraphAssignmentProvider<I> {
1245
logger_factory: LoggerFactory,
13-
subgraphs_running: Arc<Mutex<HashSet<DeploymentId>>>,
46+
deployment_registry: DeploymentRegistry,
1447
link_resolver: Arc<dyn LinkResolver>,
1548
instance_manager: Arc<I>,
1649
}
@@ -20,16 +53,17 @@ impl<I: SubgraphInstanceManager> SubgraphAssignmentProvider<I> {
2053
logger_factory: &LoggerFactory,
2154
link_resolver: Arc<dyn LinkResolver>,
2255
instance_manager: I,
56+
subgraph_metrics: Arc<SubgraphCountMetric>,
2357
) -> Self {
2458
let logger = logger_factory.component_logger("SubgraphAssignmentProvider", None);
2559
let logger_factory = logger_factory.with_parent(logger.clone());
2660

2761
// Create the subgraph provider
2862
SubgraphAssignmentProvider {
2963
logger_factory,
30-
subgraphs_running: Arc::new(Mutex::new(HashSet::new())),
3164
link_resolver: link_resolver.with_retries().into(),
3265
instance_manager: Arc::new(instance_manager),
66+
deployment_registry: DeploymentRegistry::new(subgraph_metrics),
3367
}
3468
}
3569
}
@@ -44,7 +78,7 @@ impl<I: SubgraphInstanceManager> SubgraphAssignmentProviderTrait for SubgraphAss
4478
let logger = self.logger_factory.subgraph_logger(&loc);
4579

4680
// If subgraph ID already in set
47-
if !self.subgraphs_running.lock().unwrap().insert(loc.id) {
81+
if !self.deployment_registry.insert(loc.id) {
4882
info!(logger, "Subgraph deployment is already running");
4983

5084
return Err(SubgraphAssignmentProviderError::AlreadyRunning(
@@ -74,12 +108,7 @@ impl<I: SubgraphInstanceManager> SubgraphAssignmentProviderTrait for SubgraphAss
74108
deployment: DeploymentLocator,
75109
) -> Result<(), SubgraphAssignmentProviderError> {
76110
// If subgraph ID was in set
77-
if self
78-
.subgraphs_running
79-
.lock()
80-
.unwrap()
81-
.remove(&deployment.id)
82-
{
111+
if self.deployment_registry.remove(&deployment.id) {
83112
// Shut down subgraph processing
84113
self.instance_manager.stop_subgraph(deployment).await;
85114
}

graph/src/components/metrics/subgraph.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,19 +88,30 @@ impl SubgraphInstanceMetrics {
8888

8989
#[derive(Debug)]
9090
pub struct SubgraphCountMetric {
91-
pub subgraph_count: Box<Gauge>,
91+
pub running_count: Box<Gauge>,
92+
pub deployment_count: Box<Gauge>,
9293
}
9394

9495
impl SubgraphCountMetric {
9596
pub fn new(registry: Arc<dyn MetricsRegistry>) -> Self {
96-
let subgraph_count = registry
97+
let running_count = registry
9798
.new_gauge(
98-
"deployment_count",
99+
"deployment_running_count",
99100
"Counts the number of deployments currently being indexed by the graph-node.",
100101
HashMap::new(),
101102
)
102103
.expect("failed to create `deployment_count` gauge");
103-
Self { subgraph_count }
104+
let deployment_count = registry
105+
.new_gauge(
106+
"deployment_count",
107+
"Counts the number of deployments currently deployed to the graph-node.",
108+
HashMap::new(),
109+
)
110+
.expect("failed to create `deployment_count` gauge");
111+
Self {
112+
running_count,
113+
deployment_count,
114+
}
104115
}
105116
}
106117

node/src/main.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,11 +440,14 @@ async fn main() {
440440
}
441441
let static_filters = ENV_VARS.experimental_static_filters;
442442

443+
let sg_count = Arc::new(SubgraphCountMetric::new(metrics_registry.cheap_clone()));
444+
443445
let subgraph_instance_manager = SubgraphInstanceManager::new(
444446
&logger_factory,
445447
env_vars.cheap_clone(),
446448
network_store.subgraph_store(),
447449
blockchain_map.cheap_clone(),
450+
sg_count.cheap_clone(),
448451
metrics_registry.clone(),
449452
link_resolver.clone(),
450453
ipfs_service,
@@ -456,6 +459,7 @@ async fn main() {
456459
&logger_factory,
457460
link_resolver.clone(),
458461
subgraph_instance_manager,
462+
sg_count,
459463
);
460464

461465
// Check version switching mode environment variable

node/src/manager/commands/run.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ use graph::env::EnvVars;
2121
use graph::firehose::FirehoseEndpoints;
2222
use graph::prelude::{
2323
anyhow, tokio, BlockNumber, DeploymentHash, LoggerFactory, NodeId, SubgraphAssignmentProvider,
24-
SubgraphName, SubgraphRegistrar, SubgraphStore, SubgraphVersionSwitchingMode, ENV_VARS,
24+
SubgraphCountMetric, SubgraphName, SubgraphRegistrar, SubgraphStore,
25+
SubgraphVersionSwitchingMode, ENV_VARS,
2526
};
2627
use graph::slog::{debug, info, Logger};
2728
use graph_chain_ethereum as ethereum;
@@ -148,12 +149,15 @@ pub async fn run(
148149

149150
let static_filters = ENV_VARS.experimental_static_filters;
150151

152+
let sg_metrics = Arc::new(SubgraphCountMetric::new(metrics_registry.clone()));
153+
151154
let blockchain_map = Arc::new(blockchain_map);
152155
let subgraph_instance_manager = SubgraphInstanceManager::new(
153156
&logger_factory,
154157
env_vars.cheap_clone(),
155158
subgraph_store.clone(),
156159
blockchain_map.clone(),
160+
sg_metrics.cheap_clone(),
157161
metrics_registry.clone(),
158162
link_resolver.cheap_clone(),
159163
ipfs_service,
@@ -165,6 +169,7 @@ pub async fn run(
165169
&logger_factory,
166170
link_resolver.cheap_clone(),
167171
subgraph_instance_manager,
172+
sg_metrics,
168173
));
169174

170175
let panicking_subscription_manager = Arc::new(PanicSubscriptionManager {});

tests/src/fixture/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ use graph::prelude::ethabi::ethereum_types::H256;
2626
use graph::prelude::serde_json::{self, json};
2727
use graph::prelude::{
2828
async_trait, r, ApiVersion, BigInt, BlockNumber, DeploymentHash, GraphQlRunner as _,
29-
LoggerFactory, MetricsRegistry, NodeId, QueryError, SubgraphAssignmentProvider, SubgraphName,
30-
SubgraphRegistrar, SubgraphStore as _, SubgraphVersionSwitchingMode, TriggerProcessor,
29+
LoggerFactory, MetricsRegistry, NodeId, QueryError, SubgraphAssignmentProvider,
30+
SubgraphCountMetric, SubgraphName, SubgraphRegistrar, SubgraphStore as _,
31+
SubgraphVersionSwitchingMode, TriggerProcessor,
3132
};
3233
use graph::slog::crit;
3334
use graph_core::polling_monitor::ipfs_service;
@@ -354,13 +355,15 @@ pub async fn setup<C: Blockchain>(
354355
env_vars.mappings.ipfs_timeout,
355356
env_vars.mappings.ipfs_request_limit,
356357
);
358+
let sg_count = Arc::new(SubgraphCountMetric::new(mock_registry.cheap_clone()));
357359

358360
let blockchain_map = Arc::new(blockchain_map);
359361
let subgraph_instance_manager = SubgraphInstanceManager::new(
360362
&logger_factory,
361363
env_vars.cheap_clone(),
362364
subgraph_store.clone(),
363365
blockchain_map.clone(),
366+
sg_count.cheap_clone(),
364367
mock_registry.clone(),
365368
link_resolver.cheap_clone(),
366369
ipfs_service,
@@ -391,6 +394,7 @@ pub async fn setup<C: Blockchain>(
391394
&logger_factory,
392395
link_resolver.cheap_clone(),
393396
subgraph_instance_manager.clone(),
397+
sg_count,
394398
));
395399

396400
let panicking_subscription_manager = Arc::new(PanicSubscriptionManager {});

0 commit comments

Comments
 (0)