Skip to content

Commit c41a021

Browse files
committed
core,graph,node,store: send StoreEvents for Pause and Resume and handle it
1 parent ba422c9 commit c41a021

File tree

5 files changed

+105
-15
lines changed

5 files changed

+105
-15
lines changed

core/src/subgraph/registrar.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,13 +171,20 @@ where
171171
match operation {
172172
EntityChangeOperation::Set => {
173173
store
174-
.assigned_node(&deployment)
174+
.assignment_status(&deployment)
175175
.map_err(|e| {
176176
anyhow!("Failed to get subgraph assignment entity: {}", e)
177177
})
178178
.map(|assigned| -> Box<dyn Stream<Item = _, Error = _> + Send> {
179-
if let Some(assigned) = assigned {
179+
if let Some((assigned,is_paused)) = assigned {
180180
if assigned == node_id {
181+
182+
if is_paused{
183+
// Subgraph is paused, so we don't start it
184+
debug!(logger, "Deployment assignee is this node, but it is paused, so we don't start it"; "assigned_to" => assigned, "node_id" => &node_id,"paused" => is_paused);
185+
return Box::new(stream::empty());
186+
}
187+
181188
// Start subgraph on this node
182189
debug!(logger, "Deployment assignee is this node, broadcasting add event"; "assigned_to" => assigned, "node_id" => &node_id);
183190
Box::new(stream::once(Ok(AssignmentEvent::Add {

graph/src/components/store/traits.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,15 @@ pub trait SubgraphStore: Send + Sync + 'static {
9595

9696
fn assigned_node(&self, deployment: &DeploymentLocator) -> Result<Option<NodeId>, StoreError>;
9797

98+
/// Returns Option<(node_id,is_paused)> where `node_id` is the node that
99+
/// the subgraph is assigned to, and `is_paused` is true if the
100+
/// subgraph is paused.
101+
/// Returns None if the deployment does not exist.
102+
fn assignment_status(
103+
&self,
104+
deployment: &DeploymentLocator,
105+
) -> Result<Option<(NodeId, bool)>, StoreError>;
106+
98107
fn assignments(&self, node: &NodeId) -> Result<Vec<DeploymentLocator>, StoreError>;
99108

100109
/// Return `true` if a subgraph `name` exists, regardless of whether the

node/src/manager/commands/assign.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,23 @@ pub fn pause_or_resume(
8585
.locate_site(locator.clone())?
8686
.ok_or_else(|| anyhow!("failed to locate site for {locator}"))?;
8787

88-
if pause {
89-
println!("pausing {locator}");
90-
conn.pause_subgraph(&site)?;
91-
println!("paused {locator}")
92-
} else {
93-
println!("resuming {locator}");
94-
conn.resume_subgraph(&site)?;
95-
println!("resumed {locator}")
96-
}
88+
let change = match conn.assignment_status(&site)? {
89+
Some((_, paused)) => {
90+
if paused == pause {
91+
println!("deployment {locator} is already {paused}");
92+
vec![]
93+
} else {
94+
println!("pausing {locator}");
95+
conn.pause_subgraph(&site)?
96+
}
97+
}
98+
None => {
99+
println!("resuming {locator}");
100+
conn.resume_subgraph(&site)?
101+
}
102+
};
103+
println!("Operation completed");
104+
conn.send_store_event(sender, &StoreEvent::new(change))?;
97105

98106
Ok(())
99107
}

store/postgres/src/primary.rs

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,7 @@ pub fn make_dummy_site(deployment: DeploymentHash, namespace: Namespace, network
405405
/// mirrored through `Mirror::refresh_tables` and must be queries, i.e.,
406406
/// read-only
407407
mod queries {
408+
use diesel::data_types::PgTimestamp;
408409
use diesel::dsl::{any, exists, sql};
409410
use diesel::pg::PgConnection;
410411
use diesel::prelude::{
@@ -626,6 +627,36 @@ mod queries {
626627
.transpose()
627628
}
628629

630+
/// Returns Option<(node_id,is_paused)> where `node_id` is the node that
631+
/// the subgraph is assigned to, and `is_paused` is true if the
632+
/// subgraph is paused.
633+
/// Returns None if the deployment does not exist.
634+
pub(super) fn assignment_status(
635+
conn: &PgConnection,
636+
site: &Site,
637+
) -> Result<Option<(NodeId, bool)>, StoreError> {
638+
a::table
639+
.filter(a::id.eq(site.id))
640+
.select((a::node_id, a::paused_at))
641+
.first::<(String, Option<PgTimestamp>)>(conn)
642+
.optional()?
643+
.map(|(node, ts)| {
644+
let node_id = NodeId::new(&node).map_err(|()| {
645+
constraint_violation!(
646+
"invalid node id `{}` in assignment for `{}`",
647+
node,
648+
site.deployment
649+
)
650+
})?;
651+
652+
match ts {
653+
Some(_) => Ok((node_id, true)),
654+
None => Ok((node_id, false)),
655+
}
656+
})
657+
.transpose()
658+
}
659+
629660
pub(super) fn version_info(
630661
conn: &PgConnection,
631662
version: &str,
@@ -981,7 +1012,7 @@ impl<'a> Connection<'a> {
9811012
}
9821013
}
9831014

984-
pub fn pause_subgraph(&self, site: &Site) -> Result<(), StoreError> {
1015+
pub fn pause_subgraph(&self, site: &Site) -> Result<Vec<EntityChange>, StoreError> {
9851016
use subgraph_deployment_assignment as a;
9861017

9871018
let conn = self.conn.as_ref();
@@ -991,7 +1022,11 @@ impl<'a> Connection<'a> {
9911022
.execute(conn)?;
9921023
match updates {
9931024
0 => Err(StoreError::DeploymentNotFound(site.deployment.to_string())),
994-
1 => Ok(()),
1025+
1 => {
1026+
let change =
1027+
EntityChange::for_assignment(site.into(), EntityChangeOperation::Removed);
1028+
Ok(vec![change])
1029+
}
9951030
_ => {
9961031
// `id` is the primary key of the subgraph_deployment_assignment table,
9971032
// and we can therefore only update no or one entry
@@ -1000,7 +1035,7 @@ impl<'a> Connection<'a> {
10001035
}
10011036
}
10021037

1003-
pub fn resume_subgraph(&self, site: &Site) -> Result<(), StoreError> {
1038+
pub fn resume_subgraph(&self, site: &Site) -> Result<Vec<EntityChange>, StoreError> {
10041039
use subgraph_deployment_assignment as a;
10051040

10061041
let conn = self.conn.as_ref();
@@ -1010,7 +1045,10 @@ impl<'a> Connection<'a> {
10101045
.execute(conn)?;
10111046
match updates {
10121047
0 => Err(StoreError::DeploymentNotFound(site.deployment.to_string())),
1013-
1 => Ok(()),
1048+
1 => {
1049+
let change = EntityChange::for_assignment(site.into(), EntityChangeOperation::Set);
1050+
Ok(vec![change])
1051+
}
10141052
_ => {
10151053
// `id` is the primary key of the subgraph_deployment_assignment table,
10161054
// and we can therefore only update no or one entry
@@ -1148,6 +1186,14 @@ impl<'a> Connection<'a> {
11481186
queries::assigned_node(self.conn.as_ref(), site)
11491187
}
11501188

1189+
/// Returns Option<(node_id,is_paused)> where `node_id` is the node that
1190+
/// the subgraph is assigned to, and `is_paused` is true if the
1191+
/// subgraph is paused.
1192+
/// Returns None if the deployment does not exist.
1193+
pub fn assignment_status(&self, site: &Site) -> Result<Option<(NodeId, bool)>, StoreError> {
1194+
queries::assignment_status(self.conn.as_ref(), site)
1195+
}
1196+
11511197
/// Create a copy of the site `src` in the shard `shard`, but mark it as
11521198
/// not active. If there already is a site in `shard`, return that
11531199
/// instead.
@@ -1761,6 +1807,14 @@ impl Mirror {
17611807
self.read(|conn| queries::assigned_node(conn, site))
17621808
}
17631809

1810+
/// Returns Option<(node_id,is_paused)> where `node_id` is the node that
1811+
/// the subgraph is assigned to, and `is_paused` is true if the
1812+
/// subgraph is paused.
1813+
/// Returns None if the deployment does not exist.
1814+
pub fn assignment_status(&self, site: &Site) -> Result<Option<(NodeId, bool)>, StoreError> {
1815+
self.read(|conn| queries::assignment_status(conn, site))
1816+
}
1817+
17641818
pub fn find_active_site(&self, subgraph: &DeploymentHash) -> Result<Option<Site>, StoreError> {
17651819
self.read(|conn| queries::find_active_site(conn, subgraph))
17661820
}

store/postgres/src/subgraph_store.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1286,6 +1286,18 @@ impl SubgraphStoreTrait for SubgraphStore {
12861286
self.mirror.assigned_node(site.as_ref())
12871287
}
12881288

1289+
/// Returns Option<(node_id,is_paused)> where `node_id` is the node that
1290+
/// the subgraph is assigned to, and `is_paused` is true if the
1291+
/// subgraph is paused.
1292+
/// Returns None if the deployment does not exist.
1293+
fn assignment_status(
1294+
&self,
1295+
deployment: &DeploymentLocator,
1296+
) -> Result<Option<(NodeId, bool)>, StoreError> {
1297+
let site = self.find_site(deployment.id.into())?;
1298+
self.mirror.assignment_status(site.as_ref())
1299+
}
1300+
12891301
fn assignments(&self, node: &NodeId) -> Result<Vec<DeploymentLocator>, StoreError> {
12901302
self.mirror
12911303
.assignments(node)

0 commit comments

Comments
 (0)