Skip to content

Closes #251: Preserve SQL index names when provisioning a branch #252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions netbox_branching/migrations/0003_rename_indexes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from django.db import connection, migrations

from netbox.plugins import get_plugin_config
from netbox_branching.choices import BranchStatusChoices
from netbox_branching.constants import MAIN_SCHEMA
from netbox_branching.utilities import get_sql_results

# Indexes to ignore as they are removed in a NetBox v4.3 migration
SKIP = (
'dcim_cabletermination_termination_type_id_termination_id_idx', # Removed in dcim.0207_remove_redundant_indexes
'vpn_l2vpntermination_assigned_object_type_id_assigned_objec_idx', # Removed in vpn.0009_remove_redundant_indexes
'vpn_tunneltermination_termination_type_id_termination_id_idx', # Removed in vpn.0009_remove_redundant_indexes
)


def rename_indexes(apps, schema_editor):
"""
Rename all indexes within each branch to match the main schema.
"""
Branch = apps.get_model('netbox_branching', 'Branch')
schema_prefix = get_plugin_config('netbox_branching', 'schema_prefix')

with connection.cursor() as cursor:

for branch in Branch.objects.filter(status=BranchStatusChoices.READY):
print(f'\n Renaming indexes for branch {branch.name} ({branch.schema_id})...', end='')
schema_name = f'{schema_prefix}{branch.schema_id}'

# Fetch all SQL indexes from the branch schema
cursor.execute(
f"SELECT tablename, indexname, indexdef FROM pg_indexes WHERE schemaname = '{schema_name}'"
)
branch_indexes = get_sql_results(cursor)

for branch_index in branch_indexes:

# Skip index if applicable
if branch_index.indexname in SKIP:
continue

# Find the matching index in main based on its table & definition
definition = branch_index.indexdef.split(' USING ', maxsplit=1)[1]
cursor.execute(
"SELECT indexname FROM pg_indexes WHERE schemaname=%s AND tablename=%s AND indexdef LIKE %s",
[MAIN_SCHEMA, branch_index.tablename, f'% {definition}']
)
if result := cursor.fetchone():
new_name = result[0]
if new_name != branch_index.indexname:
sql = f"ALTER INDEX {schema_name}.{branch_index.indexname} RENAME TO {new_name}"
try:
cursor.execute(sql)
except Exception as e:
print(sql)
raise e

print('\n ', end='') # Padding for final "OK"


class Migration(migrations.Migration):

dependencies = [
('netbox_branching', '0002_branch_schema_id_unique'),
]

operations = [
migrations.RunPython(
code=rename_indexes,
reverse_code=migrations.RunPython.noop
),
]
41 changes: 37 additions & 4 deletions netbox_branching/models/branches.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
from netbox.models.features import JobsMixin
from netbox.plugins import get_plugin_config
from netbox_branching.choices import BranchEventTypeChoices, BranchStatusChoices
from netbox_branching.constants import MAIN_SCHEMA
from netbox_branching.contextvars import active_branch
from netbox_branching.signals import *
from netbox_branching.utilities import (
ChangeSummary, activate_branch, get_branchable_object_types, get_tables_to_replicate, record_applied_change,
ChangeSummary, activate_branch, get_branchable_object_types, get_sql_results, get_tables_to_replicate,
record_applied_change,
)
from utilities.exceptions import AbortRequest, AbortTransaction
from .changes import ObjectChange
Expand Down Expand Up @@ -525,31 +527,34 @@ def provision(self, user):
# Create an empty copy of the global change log. Share the ID sequence from the main table to avoid
# reusing change record IDs.
table = ObjectChange_._meta.db_table
main_table = f'public.{table}'
main_table = f'{MAIN_SCHEMA}.{table}'
schema_table = f'{schema}.{table}'
logger.debug(f'Creating table {schema_table}')
cursor.execute(
f"CREATE TABLE {schema_table} ( LIKE {main_table} INCLUDING INDEXES )"
)
# Set the default value for the ID column to the sequence associated with the source table
sequence_name = f'public.{table}_id_seq'
sequence_name = f'{MAIN_SCHEMA}.{table}_id_seq'
cursor.execute(
f"ALTER TABLE {schema_table} ALTER COLUMN id SET DEFAULT nextval(%s)", [sequence_name]
)

# Replicate relevant tables from the main schema
for table in get_tables_to_replicate():
main_table = f'public.{table}'
main_table = f'{MAIN_SCHEMA}.{table}'
schema_table = f'{schema}.{table}'
logger.debug(f'Creating table {schema_table}')

# Create the table in the new schema
cursor.execute(
f"CREATE TABLE {schema_table} ( LIKE {main_table} INCLUDING INDEXES )"
)

# Copy data from the source table
cursor.execute(
f"INSERT INTO {schema_table} SELECT * FROM {main_table}"
)

# Get the name of the sequence used for object ID allocations (if one exists)
cursor.execute(
"SELECT pg_get_serial_sequence(%s, 'id')", [table]
Expand All @@ -560,6 +565,34 @@ def provision(self, user):
f"ALTER TABLE {schema_table} ALTER COLUMN id SET DEFAULT nextval(%s)", [sequence_name]
)

# Rename indexes to ensure consistency with the main schema for migration compatibility
cursor.execute(
f"SELECT tablename, indexname, indexdef FROM pg_indexes WHERE schemaname = '{schema}'"
)
for index in get_sql_results(cursor):
# Find the matching index in main based on its table & definition
definition = index.indexdef.split(' USING ', maxsplit=1)[1]
cursor.execute(
"SELECT indexname FROM pg_indexes WHERE schemaname=%s AND tablename=%s AND indexdef LIKE %s",
[MAIN_SCHEMA, index.tablename, f'% {definition}']
)
if result := cursor.fetchone():
# Rename the branch schema index (if needed)
original_name = index.indexname
new_name = result[0]
if new_name != original_name:
sql = f"ALTER INDEX {schema}.{original_name} RENAME TO {new_name}"
try:
cursor.execute(sql)
logger.debug(sql)
except Exception as e:
logger.error(sql)
raise e
else:
logger.warning(
f"Found no matching index in main for branch index {index.indexname}."
)

# Commit the transaction
cursor.execute("COMMIT")

Expand Down
13 changes: 13 additions & 0 deletions netbox_branching/tests/test_branches.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ def test_create_branch(self):
tables_found = {row.table_name for row in fetchall(cursor)}
self.assertSetEqual(tables_expected, tables_found)

# Check that all indexes were renamed to match the main schema
cursor.execute(
"SELECT idx_a.schemaname AS schema_a, idx_a.tablename AS table_a, idx_a.indexname AS index_in_a "
"FROM pg_indexes idx_a "
"WHERE idx_a.schemaname=%s "
"AND NOT EXISTS ("
" SELECT 1 FROM pg_indexes idx_b "
" WHERE idx_b.schemaname=%s AND idx_b.indexname=idx_a.indexname"
") ORDER BY idx_a.indexname",
[branch.schema_name, MAIN_SCHEMA]
)
self.assertListEqual(fetchall(cursor), [], "Found indexes with unique names in branch schema.")

# Check that object counts match the main schema for each table
for table_name in tables_to_replicate:
cursor.execute(f"SELECT COUNT(id) FROM {MAIN_SCHEMA}.{table_name}")
Expand Down
13 changes: 12 additions & 1 deletion netbox_branching/utilities.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import datetime
import logging
from collections import defaultdict
from collections import defaultdict, namedtuple
from contextlib import contextmanager, nullcontext
from dataclasses import dataclass

Expand All @@ -25,6 +25,7 @@
'deactivate_branch',
'get_active_branch',
'get_branchable_object_types',
'get_sql_results',
'get_tables_to_replicate',
'is_api_request',
'record_applied_change',
Expand Down Expand Up @@ -253,6 +254,16 @@ def get_active_branch(request):
return Branch.objects.filter(schema_id=schema_id, status=BranchStatusChoices.READY).first()


def get_sql_results(cursor):
"""
Return the results of the most recent SQL query as a list of named tuples.
"""
Result = namedtuple("Result", [col[0] for col in cursor.description])
return [
Result(*row) for row in cursor.fetchall()
]


@register_request_processor
def ActiveBranchContextManager(request):
"""
Expand Down