Skip to content

Commit 0189529

Browse files
committed
graph,store,core - add active_assigments() for unpaused subgraphs and wire it in subgraph startup
1 parent c41a021 commit 0189529

File tree

5 files changed

+42
-9
lines changed

5 files changed

+42
-9
lines changed

core/src/subgraph/registrar.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ where
226226
let logger = self.logger.clone();
227227
let node_id = self.node_id.clone();
228228

229-
future::result(self.store.assignments(&self.node_id))
229+
future::result(self.store.active_assignments(&self.node_id))
230230
.map_err(|e| anyhow!("Error querying subgraph assignments: {}", e))
231231
.and_then(move |deployments| {
232232
// This operation should finish only after all subgraphs are

graph/src/components/store/traits.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ pub trait SubgraphStore: Send + Sync + 'static {
106106

107107
fn assignments(&self, node: &NodeId) -> Result<Vec<DeploymentLocator>, StoreError>;
108108

109+
/// Returns assignments that are not paused
110+
fn active_assignments(&self, node: &NodeId) -> Result<Vec<DeploymentLocator>, StoreError>;
111+
109112
/// Return `true` if a subgraph `name` exists, regardless of whether the
110113
/// subgraph has any deployments attached to it
111114
fn subgraph_exists(&self, name: &SubgraphName) -> Result<bool, StoreError>;

node/src/manager/commands/assign.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ pub fn pause_or_resume(
7474
primary: ConnectionPool,
7575
sender: &NotificationSender,
7676
search: &DeploymentSearch,
77-
pause: bool,
77+
should_pause: bool,
7878
) -> Result<(), Error> {
7979
let locator = search.locate_unique(&primary)?;
8080

@@ -86,18 +86,22 @@ pub fn pause_or_resume(
8686
.ok_or_else(|| anyhow!("failed to locate site for {locator}"))?;
8787

8888
let change = match conn.assignment_status(&site)? {
89-
Some((_, paused)) => {
90-
if paused == pause {
91-
println!("deployment {locator} is already {paused}");
92-
vec![]
93-
} else {
89+
Some((_, is_paused)) => {
90+
if should_pause {
91+
if is_paused {
92+
println!("deployment {locator} is already paused");
93+
return Ok(());
94+
}
9495
println!("pausing {locator}");
9596
conn.pause_subgraph(&site)?
97+
} else {
98+
println!("resuming {locator}");
99+
conn.resume_subgraph(&site)?
96100
}
97101
}
98102
None => {
99-
println!("resuming {locator}");
100-
conn.resume_subgraph(&site)?
103+
println!("deployment {locator} not found");
104+
return Ok(());
101105
}
102106
};
103107
println!("Operation completed");

store/postgres/src/primary.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,22 @@ mod queries {
588588
.collect::<Result<Vec<Site>, _>>()
589589
}
590590

591+
// All assignments for a node that are currently not paused
592+
pub(super) fn active_assignments(
593+
conn: &PgConnection,
594+
node: &NodeId,
595+
) -> Result<Vec<Site>, StoreError> {
596+
ds::table
597+
.inner_join(a::table.on(a::id.eq(ds::id)))
598+
.filter(a::node_id.eq(node.as_str()))
599+
.filter(a::paused_at.is_null())
600+
.select(ds::all_columns)
601+
.load::<Schema>(conn)?
602+
.into_iter()
603+
.map(Site::try_from)
604+
.collect::<Result<Vec<Site>, _>>()
605+
}
606+
591607
pub(super) fn fill_assignments(
592608
conn: &PgConnection,
593609
infos: &mut [status::Info],
@@ -1803,6 +1819,10 @@ impl Mirror {
18031819
self.read(|conn| queries::assignments(conn, node))
18041820
}
18051821

1822+
pub fn active_assignments(&self, node: &NodeId) -> Result<Vec<Site>, StoreError> {
1823+
self.read(|conn| queries::active_assignments(conn, node))
1824+
}
1825+
18061826
pub fn assigned_node(&self, site: &Site) -> Result<Option<NodeId>, StoreError> {
18071827
self.read(|conn| queries::assigned_node(conn, site))
18081828
}

store/postgres/src/subgraph_store.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,6 +1304,12 @@ impl SubgraphStoreTrait for SubgraphStore {
13041304
.map(|sites| sites.iter().map(|site| site.into()).collect())
13051305
}
13061306

1307+
fn active_assignments(&self, node: &NodeId) -> Result<Vec<DeploymentLocator>, StoreError> {
1308+
self.mirror
1309+
.active_assignments(node)
1310+
.map(|sites| sites.iter().map(|site| site.into()).collect())
1311+
}
1312+
13071313
fn subgraph_exists(&self, name: &SubgraphName) -> Result<bool, StoreError> {
13081314
self.mirror.subgraph_exists(name)
13091315
}

0 commit comments

Comments
 (0)