Skip to content

update rel cypher queries for migrated kind nodes #6433

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 12, 2025
Merged
25 changes: 24 additions & 1 deletion backend/infrahub/core/query/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,32 @@ async def query_init(self, db: InfrahubDatabase, **kwargs) -> None: # noqa: ARG
self.params["branch"] = self.branch.name
self.params["branch_level"] = self.branch.hierarchy_level

if self.branch.is_global or self.branch.is_default:
node_query_match = """
MATCH (n:Node { uuid: $uuid })
OPTIONAL MATCH (n)-[delete_edge:IS_PART_OF {status: "deleted", branch: $branch}]->(:Root)
WHERE delete_edge.from <= $at
WITH n WHERE delete_edge IS NULL
"""
else:
node_filter, node_filter_params = self.branch.get_query_filter_path(at=self.at, variable_name="r")
node_query_match = """
MATCH (n:Node { uuid: $uuid })
CALL {
WITH n
MATCH (n)-[r:IS_PART_OF]->(:Root)
WHERE %(node_filter)s
RETURN r.status = "active" AS is_active
ORDER BY r.from DESC
LIMIT 1
}
WITH n WHERE is_active = TRUE
""" % {"node_filter": node_filter}
self.params.update(node_filter_params)
self.add_to_query(node_query_match)

query = """
MATCH (root:Root)
MATCH (n:Node { uuid: $uuid })
CREATE (n)-[r:IS_PART_OF { branch: $branch, branch_level: $branch_level, status: "deleted", from: $at }]->(root)
"""

Expand Down
294 changes: 237 additions & 57 deletions backend/infrahub/core/query/relationship.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion backend/infrahub/core/relationship/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ async def delete(self, db: InfrahubDatabase, at: Timestamp | None = None) -> Non
await update_relationships_to(rel_ids_to_update, to=delete_at, db=db)

delete_query = await RelationshipDeleteQuery.init(
db=db, rel=self, source_id=node.id, destination_id=peer.id, branch=branch, at=delete_at
db=db, rel=self, source=node, destination=peer, branch=branch, at=delete_at
)
await delete_query.execute(db=db)

Expand Down
28 changes: 28 additions & 0 deletions backend/tests/helpers/db_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,31 @@ async def validate_node_relationships(node: Node, branch: Branch, db: InfrahubDa
for result in query.results:
print(result)
assert len(result.data) == 1 and result.data[0] == "Edges state is correct"


async def verify_no_duplicate_paths(db: InfrahubDatabase) -> None:
"""Verify that no duplicate paths exist at the database level"""
query = """
MATCH path = (p)-[e]->(q)
WITH
%(id_func)s(p) AS node_id1,
e.branch AS branch,
e.from AS from_time,
type(e) AS edge_type,
%(id_func)s(q) AS node_id2,
path
WITH node_id1, branch, from_time, edge_type, node_id2, size(collect(path)) AS num_paths
WHERE num_paths > 1
RETURN node_id1, branch, from_time, edge_type, node_id2, num_paths
""" % {"id_func": db.get_id_function_name()}
records = await db.execute_query(query=query)
for record in records:
node_id1 = record.get("node_id1")
branch = record.get("branch")
from_time = record.get("from_time")
edge_type = record.get("edge_type")
node_id2 = record.get("node_id2")
num_paths = record.get("num_paths")
raise ValueError(
f"{num_paths} paths ({branch=},{edge_type=},{from_time=}) between nodes '{node_id1}' and '{node_id2}'"
)
24 changes: 1 addition & 23 deletions backend/tests/unit/core/diff/test_diff_and_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,13 @@
from infrahub.core.timestamp import Timestamp
from infrahub.database import InfrahubDatabase
from infrahub.dependencies.registry import get_component_registry
from tests.helpers.db_validation import verify_no_duplicate_paths
from tests.unit.conftest import _build_hierarchical_location_data
from tests.unit.core.test_utils import verify_all_linked_edges_deleted

from .get_one_node import get_one_diff_node


async def verify_no_duplicate_paths(db: InfrahubDatabase) -> None:
"""Verify that no duplicate paths exist at the database level"""
query = """
MATCH path = (p)-[e]->(q)
WITH COALESCE(p.uuid, p.value) AS node_id1, e.branch AS branch, e.from AS from_time, type(e) AS edge_type, COALESCE(q.uuid, q.value) AS node_id2, path
WHERE node_id1 IS NOT NULL AND node_id2 IS NOT NULL
WITH node_id1, branch, from_time, edge_type, node_id2, size(collect(path)) AS num_paths
WHERE num_paths > 1
RETURN node_id1, branch, from_time, edge_type, node_id2, num_paths
"""
records = await db.execute_query(query=query)
for record in records:
node_id1 = record.get("node_id1")
branch = record.get("branch")
from_time = record.get("from_time")
edge_type = record.get("edge_type")
node_id2 = record.get("node_id2")
num_paths = record.get("num_paths")
raise ValueError(
f"{num_paths} paths ({branch=},{edge_type=},{from_time=}) between nodes '{node_id1}' and '{node_id2}'"
)


class TestDiffAndMerge:
@pytest.fixture
async def diff_repository(self, db: InfrahubDatabase, default_branch: Branch) -> DiffRepository:
Expand Down
46 changes: 46 additions & 0 deletions backend/tests/unit/core/test_relationship.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,3 +437,49 @@ async def test_relationship_assign_from_pool(
await obj.save(db=db)

assert await obj.prefix.get_peer(db=db)


async def test_relationship_timestamp_changes(
db: InfrahubDatabase, person_jack_main: Node, tag_blue_main: Node, tag_red_main: Node, branch: Branch
):
# test going back in time after adding a relationship
before_add = Timestamp()
person_jack = await NodeManager.get_one(db=db, branch=branch, id=person_jack_main.id)
await person_jack.tags.update(db=db, data=[tag_blue_main.id])
await person_jack.save(db=db)
before_add_person_jack = await NodeManager.get_one(
db=db, branch=branch, id=person_jack_main.id, at=before_add, prefetch_relationships=True
)
tag_rels = await before_add_person_jack.tags.get_relationships(db=db)
assert not tag_rels

# test going back in time after deleting a relationship
before_remove = Timestamp()
person_jack = await NodeManager.get_one(db=db, branch=branch, id=person_jack_main.id)
await person_jack.tags.update(db=db, data=[None])
await person_jack.save(db=db)
before_remove_person_jack = await NodeManager.get_one(
db=db, branch=branch, id=person_jack_main.id, at=before_remove, prefetch_relationships=True
)
tag_rels = await before_remove_person_jack.tags.get_relationships(db=db)
assert len(tag_rels) == 1
assert [r.peer_id for r in tag_rels] == [tag_blue_main.id]

# test with manually set save time
save_time = Timestamp()
before_save = save_time.add(microseconds=-1)
after_save = save_time.add(microseconds=1)
person_jack = await NodeManager.get_one(db=db, branch=branch, id=person_jack_main.id)
await person_jack.tags.update(db=db, data=[tag_red_main.id])
await person_jack.save(db=db, at=save_time)
before_save_person_jack = await NodeManager.get_one(
db=db, branch=branch, id=person_jack_main.id, at=before_save, prefetch_relationships=True
)
tag_rels = await before_save_person_jack.tags.get_relationships(db=db)
assert len(tag_rels) == 0
after_save_person_jack = await NodeManager.get_one(
db=db, branch=branch, id=person_jack_main.id, at=after_save, prefetch_relationships=True
)
tag_rels = await after_save_person_jack.tags.get_relationships(db=db)
assert len(tag_rels) == 1
assert [r.peer_id for r in tag_rels] == [tag_red_main.id]
137 changes: 137 additions & 0 deletions backend/tests/unit/core/test_relationship_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from infrahub.core.timestamp import Timestamp
from infrahub.core.utils import get_paths_between_nodes
from infrahub.database import InfrahubDatabase
from tests.helpers.db_validation import verify_no_duplicate_paths


class DummyRelationshipQuery(RelationshipQuery):
Expand Down Expand Up @@ -244,6 +245,7 @@ async def test_query_RelationshipCreateQuery_for_node_with_migrated_kind(
relationships=["IS_RELATED"],
)
assert len(paths) == 0
await verify_no_duplicate_paths(db=db)


async def test_query_RelationshipDeleteQuery(
Expand Down Expand Up @@ -377,6 +379,55 @@ def get_active_path_and_rel(all_paths, previous_rel: str):
assert len(paths) == 8


async def test_query_RelationshipDeleteQuery_on_migrated_kind_node(
db: InfrahubDatabase, tag_blue_main: Node, person_jack_tags_main: Node, branch: Branch
):
person_schema = registry.schema.get(name="TestPerson")
rel_schema = person_schema.get_relationship("tags")
paths = await get_paths_between_nodes(
db=db,
source_id=tag_blue_main.db_id,
destination_id=person_jack_tags_main.db_id,
max_length=2,
relationships=["IS_RELATED"],
)
assert len(paths) == 1

# migrate person kind
person_schema.name = "NewPerson"
person_schema.namespace = "Test2"
assert person_schema.kind == "Test2NewPerson"
registry.schema.set(name="Test2NewPerson", schema=person_schema, branch=branch.name)
migration = NodeKindUpdateMigration(
previous_node_schema=registry.schema.get(name="TestPerson", branch=branch),
new_node_schema=person_schema,
schema_path=SchemaPath(
path_type=SchemaPathType.ATTRIBUTE, schema_kind="Test2NewPerson", field_name="namespace"
),
)
execution_result = await migration.execute(db=db, branch=branch)
assert not execution_result.errors

migrated_jack = await NodeManager.get_one(db=db, branch=branch, id=person_jack_tags_main.id)
tag_rels = await migrated_jack.tags.get_relationships(db=db)
assert len(tag_rels) == 2
blue_tag_rels = [tag_rel for tag_rel in tag_rels if tag_rel.peer_id == tag_blue_main.id]
assert len(blue_tag_rels) == 1
blue_tag_rel = blue_tag_rels[0]

query = await RelationshipDeleteQuery.init(
db=db,
source=migrated_jack,
destination=tag_blue_main,
schema=rel_schema,
rel=blue_tag_rel,
branch=branch,
at=Timestamp(),
)
await query.execute(db=db)
await verify_no_duplicate_paths(db=db)


async def test_relationship_delete_peer(db: InfrahubDatabase, default_branch, tag_blue_main: Node):
person = await Node.init(db=db, branch=default_branch, schema="TestPerson")
await person.new(db=db, firstname="Kara", lastname="Thrace", tags=[tag_blue_main])
Expand Down Expand Up @@ -805,6 +856,92 @@ async def test_query_RelationshipDataDeleteQuery(
assert len(paths) == 4


async def test_query_RelationshipDataDeleteQuery_on_migrated_kind_node(
db: InfrahubDatabase, tag_blue_main: Node, tag_red_main: Node, person_jack_tags_main: Node, branch: Branch
):
person_schema = registry.schema.get(name="TestPerson", branch=branch)
rel_schema = person_schema.get_relationship("tags")

# migrate person kind
person_schema.name = "NewPerson"
person_schema.namespace = "Test2"
assert person_schema.kind == "Test2NewPerson"
registry.schema.set(name="Test2NewPerson", schema=person_schema, branch=branch.name)
migration = NodeKindUpdateMigration(
previous_node_schema=registry.schema.get(name="TestPerson", branch=branch),
new_node_schema=person_schema,
schema_path=SchemaPath(
path_type=SchemaPathType.ATTRIBUTE, schema_kind="Test2NewPerson", field_name="namespace"
),
)
execution_result = await migration.execute(db=db, branch=branch)
assert not execution_result.errors

migrated_jack = await NodeManager.get_one(db=db, branch=branch, id=person_jack_tags_main.id)
# Query the existing relationship in RelationshipPeerData format
query1 = await RelationshipGetPeerQuery.init(
db=db,
source=migrated_jack,
schema=rel_schema,
rel=Relationship(schema=rel_schema, branch=branch, node=migrated_jack),
)
await query1.execute(db=db)
peers_database: dict[str, RelationshipPeerData] = {peer.peer_id: peer for peer in query1.get_peers()}

# Delete the relationship
query2 = await RelationshipDataDeleteQuery.init(
db=db,
branch=branch,
source=migrated_jack,
data=peers_database[tag_blue_main.id],
schema=rel_schema,
rel=Relationship,
)
await query2.execute(db=db)
await verify_no_duplicate_paths(db=db)

# migrate tag kind
tag_schema = registry.schema.get("BuiltinTag", branch=branch)
tag_schema.name = "NewTag"
tag_schema.namespace = "Builtin2"
assert tag_schema.kind == "Builtin2NewTag"
registry.schema.set(name="Builtin2NewTag", schema=tag_schema, branch=branch.name)
migration = NodeKindUpdateMigration(
previous_node_schema=registry.schema.get(name="BuiltinTag", branch=branch),
new_node_schema=tag_schema,
schema_path=SchemaPath(
path_type=SchemaPathType.ATTRIBUTE, schema_kind="Builtin2NewTag", field_name="namespace"
),
)
execution_result = await migration.execute(db=db, branch=branch)
assert not execution_result.errors

# delete other tag relationship
rel_schema.peer = "Builtin2NewTag"
migrated_jack = await NodeManager.get_one(db=db, branch=branch, id=person_jack_tags_main.id)
# Query the existing relationship in RelationshipPeerData format
query1 = await RelationshipGetPeerQuery.init(
db=db,
source=migrated_jack,
schema=rel_schema,
rel=Relationship(schema=rel_schema, branch=branch, node=migrated_jack),
)
await query1.execute(db=db)
peers_database: dict[str, RelationshipPeerData] = {peer.peer_id: peer for peer in query1.get_peers()}

# Delete the relationship
query2 = await RelationshipDataDeleteQuery.init(
db=db,
branch=branch,
source=migrated_jack,
data=peers_database[tag_red_main.id],
schema=rel_schema,
rel=Relationship,
)
await query2.execute(db=db)
await verify_no_duplicate_paths(db=db)


async def test_query_RelationshipCountPerNodeQuery(
db: InfrahubDatabase,
person_john_main,
Expand Down
2 changes: 1 addition & 1 deletion changelog/+rel_create_on_migrated_kind_node.fixed.md
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Prevent creating duplicate edges on the database when adding a relationship to a node that had its kind or inheritance updated
Prevent creating duplicate edges on the database when adding a relationship to or deleting a relationship from a node that had its kind or inheritance updated
Loading