diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 2d3b513f..6fa60d8f 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -28,9 +28,8 @@ "isort.args": [ "--profile=black" ], - "isort.path": "/opt/netbox/venv/bin/isort", + "isort.path": ["/opt/netbox/venv/bin/isort"], "python.analysis.typeCheckingMode": "strict", - python.Jedi "python.analysis.extraPaths": [ "/opt/netbox/netbox" ], @@ -86,6 +85,7 @@ "aaron-bond.better-comments", "batisteo.vscode-django", "codezombiech.gitignore", + "eamodio.gitlens", "esbenp.prettier-vscode", "formulahendry.auto-rename-tag", "mintlify.document", diff --git a/.devcontainer/requirements-dev.txt b/.devcontainer/requirements-dev.txt index d4a50d5e..e772e592 100644 --- a/.devcontainer/requirements-dev.txt +++ b/.devcontainer/requirements-dev.txt @@ -13,3 +13,4 @@ pylint-django wily yapf sourcery-analytics +coverage diff --git a/.gitignore b/.gitignore index 2f771e82..28a01bed 100644 --- a/.gitignore +++ b/.gitignore @@ -163,3 +163,15 @@ cython_debug/ .vscode/ # JetBrains .idea/ + +# Temporary files +*.tmp +tmp/ + +# coverage +coverage/ +htmlcov/ +.coverage +.coverage.* +coverage.xml +*.cover diff --git a/Makefile b/Makefile index fb3ba16e..259948c0 100644 --- a/Makefile +++ b/Makefile @@ -75,8 +75,15 @@ rebuild: setup makemigrations migrate collectstatic start .PHONY: test test: setup - ${VENV_PY_PATH} ${NETBOX_MANAGE_PATH}/manage.py makemigrations ${PLUGIN_NAME} --check - ${VENV_PY_PATH} ${NETBOX_MANAGE_PATH}/manage.py test ${PLUGIN_NAME} + ${NETBOX_MANAGE_PATH}/manage.py makemigrations ${PLUGIN_NAME} --check + coverage run --source "netbox_acls" ${NETBOX_MANAGE_PATH}/manage.py test ${PLUGIN_NAME} -v 2 + +.PHONY: coverage_report +coverage_report: + coverage report + +.PHONY: test_coverage +test_coverage: test coverage_report #relpatch: # $(eval GSTATUS := $(shell git status --porcelain)) diff --git a/netbox_acls/forms/models.py b/netbox_acls/forms/models.py index 8d04d3a8..e034da6f 100644 --- a/netbox_acls/forms/models.py +++ b/netbox_acls/forms/models.py @@ -149,7 +149,6 @@ class Meta: } def __init__(self, *args, **kwargs): - # Initialize helper selectors instance = kwargs.get("instance") initial = kwargs.get("initial", {}).copy() @@ -324,7 +323,6 @@ class ACLInterfaceAssignmentForm(NetBoxModelForm): comments = CommentField() def __init__(self, *args, **kwargs): - # Initialize helper selectors instance = kwargs.get("instance") initial = kwargs.get("initial", {}).copy() diff --git a/netbox_acls/graphql/schema.py b/netbox_acls/graphql/schema.py index cdaa8a2c..0dc2dd27 100644 --- a/netbox_acls/graphql/schema.py +++ b/netbox_acls/graphql/schema.py @@ -3,6 +3,7 @@ from .types import * + class Query(ObjectType): """ Defines the queries available to this plugin via the graphql api. diff --git a/netbox_acls/graphql/types.py b/netbox_acls/graphql/types.py index 9e05942d..f43774c2 100644 --- a/netbox_acls/graphql/types.py +++ b/netbox_acls/graphql/types.py @@ -72,4 +72,3 @@ class Meta: model = models.ACLStandardRule fields = "__all__" filterset_class = filtersets.ACLStandardRuleFilterSet - diff --git a/netbox_acls/migrations/0002_alter_accesslist_options_and_more.py b/netbox_acls/migrations/0002_alter_accesslist_options_and_more.py index ea5ab861..902248f0 100644 --- a/netbox_acls/migrations/0002_alter_accesslist_options_and_more.py +++ b/netbox_acls/migrations/0002_alter_accesslist_options_and_more.py @@ -6,7 +6,6 @@ class Migration(migrations.Migration): - dependencies = [ ("contenttypes", "0002_remove_content_type_name"), ("netbox_acls", "0001_initial"), diff --git a/netbox_acls/migrations/0003_netbox_acls.py b/netbox_acls/migrations/0003_netbox_acls.py index 562e0f3d..0af5aef4 100644 --- a/netbox_acls/migrations/0003_netbox_acls.py +++ b/netbox_acls/migrations/0003_netbox_acls.py @@ -5,7 +5,6 @@ class Migration(migrations.Migration): - dependencies = [ ("netbox_acls", "0002_alter_accesslist_options_and_more"), ] diff --git a/netbox_acls/migrations/0004_alter_accesslist_name.py b/netbox_acls/migrations/0004_alter_accesslist_name.py new file mode 100644 index 00000000..3086dacf --- /dev/null +++ b/netbox_acls/migrations/0004_alter_accesslist_name.py @@ -0,0 +1,26 @@ +# Generated by Django 4.1.5 on 2023-02-02 22:34 + +import django.core.validators +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("netbox_acls", "0003_netbox_acls"), + ] + + operations = [ + migrations.AlterField( + model_name="accesslist", + name="name", + field=models.CharField( + max_length=500, + validators=[ + django.core.validators.RegexValidator( + "^[a-zA-Z0-9-_]+$", + "Only alphanumeric, hyphens, and underscores characters are allowed.", + ) + ], + ), + ), + ] diff --git a/netbox_acls/models/access_lists.py b/netbox_acls/models/access_lists.py index 1feb6353..31185ef4 100644 --- a/netbox_acls/models/access_lists.py +++ b/netbox_acls/models/access_lists.py @@ -5,6 +5,7 @@ from dcim.models import Device, Interface, VirtualChassis from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation from django.contrib.contenttypes.models import ContentType +from django.core.exceptions import ValidationError from django.core.validators import RegexValidator from django.db import models from django.urls import reverse @@ -21,7 +22,7 @@ alphanumeric_plus = RegexValidator( - r"^[0-9a-zA-Z,-,_]*$", + r"^[a-zA-Z0-9-_]+$", "Only alphanumeric, hyphens, and underscores characters are allowed.", ) @@ -146,6 +147,22 @@ def get_absolute_url(self): args=[self.pk], ) + def clean(self): + super().clean() + + # Get the model type of the assigned interface. + if self.assigned_object_type.model_class() == VMInterface: + interface_host = self.assigned_object.virtual_machine + elif self.assigned_object_type.model_class() == Interface: + interface_host = self.assigned_object.device + # Check if the assigned interface's host is the same as the host assigned to the access list. + if interface_host != self.access_list.assigned_object: + raise ValidationError( + { + "assigned_object": "The assigned object must be the same as the device assigned to it." + } + ) + @classmethod def get_prerequisite_models(cls): return [AccessList] diff --git a/netbox_acls/tests/test_models.py b/netbox_acls/tests/test_models.py new file mode 100644 index 00000000..8816b841 --- /dev/null +++ b/netbox_acls/tests/test_models.py @@ -0,0 +1,367 @@ +from itertools import cycle + +from dcim.models import ( + Device, + DeviceRole, + DeviceType, + Interface, + Manufacturer, + Site, + VirtualChassis, +) +from django.contrib.contenttypes.models import ContentType +from django.core.exceptions import ValidationError +from django.test import TestCase +from ipam.models import Prefix +from netaddr import IPNetwork +from virtualization.models import Cluster, ClusterType, VirtualMachine, VMInterface + +from netbox_acls.models import * + + +class BaseTestCase(TestCase): + """ + Base test case for netbox_acls models. + """ + + @classmethod + def setUpTestData(cls): + """ + Create base data to test using including: + - 1 of each of the following: test site, manufacturer, device type, device role, cluster type, cluster, virtual_chassis, & virtual machine + - 2 devices, prefixes, 2 interfaces, and 2 vminterfaces + """ + + site = Site.objects.create(name="Site 1", slug="site-1") + manufacturer = Manufacturer.objects.create( + name="Manufacturer 1", + slug="manufacturer-1", + ) + devicetype = DeviceType.objects.create( + manufacturer=manufacturer, + model="Device Type 1", + ) + devicerole = DeviceRole.objects.create( + name="Device Role 1", + slug="device-role-1", + ) + device = Device.objects.create( + name="Device 1", + site=site, + device_type=devicetype, + device_role=devicerole, + ) + # virtual_chassis = VirtualChassis.objects.create(name="Virtual Chassis 1") + # virtual_chassis_member = Device.objects.create( + # name="VC Device", + # site=site, + # device_type=devicetype, + # device_role=devicerole, + # virtual_chassis=virtual_chassis, + # vc_position=1, + # ) + # cluster_member = Device.objects.create( + # name="Cluster Device", + # site=site, + # device_type=devicetype, + # device_role=devicerole, + # ) + # clustertype = ClusterType.objects.create(name="Cluster Type 1") + # cluster = Cluster.objects.create( + # name="Cluster 1", + # type=clustertype, + # ) + virtual_machine = VirtualMachine.objects.create(name="VirtualMachine 1") + virtual_machine.save() + prefix = Prefix.objects.create(prefix="10.0.0.0/8") + + +class TestAccessList(BaseTestCase): + """ + Test AccessList model. + """ + + common_acl_params = { + "type": "extended", + "default_action": "permit", + } + # device = Device.objects.first() + + def test_wrong_assigned_object_type_fail(self): + """ + Test that AccessList cannot be assigned to an object type other than Device, VirtualChassis, VirtualMachine, or Cluster. + """ + acl_bad_gfk = AccessList( + name="TestACL_Wrong_GFK", + assigned_object_type=ContentType.objects.get_for_model(Prefix), + assigned_object_id=Prefix.objects.first(), + **self.common_acl_params, + ) + with self.assertRaises(ValidationError): + acl_bad_gfk.full_clean() + + def test_alphanumeric_plus_success(self): + """ + Test that AccessList names with alphanumeric characters, '_', or '-' pass validation. + """ + acl_good_name = AccessList( + name="Testacl-Good_Name-1", + assigned_object_type=ContentType.objects.get_for_model(Device), + assigned_object_id=1, # TODO - replace with Device.objects.first() + **self.common_acl_params, + ) + acl_good_name.full_clean() + # TODO: test_alphanumeric_plus_success - VirtualChassis, VirtualMachine & Cluster + + def test_duplicate_name_success(self): + """ + Test that AccessList names can be non-unique if associated to different devices. + """ + AccessList.objects.create( + name="GOOD-DUPLICATE-ACL", + assigned_object_type=ContentType.objects.get_for_model(Device), + assigned_object_id=1, # TODO - replace with Device.objects.first() + **self.common_acl_params, + ) + vm_acl = AccessList( + name="GOOD-DUPLICATE-ACL", + assigned_object_type=ContentType.objects.get_for_model(VirtualMachine), + assigned_object_id=1, # TODO - replace with VirtualMachine.objects.first().id, + **self.common_acl_params, + ) + vm_acl.full_clean() + # TODO: test_duplicate_name_success - VirtualChassis, VirtualMachine & Cluster + # vc_acl = AccessList( + # "name": "GOOD-DUPLICATE-ACL", + # assigned_object_type=ContentType.objects.get_for_model(VirtualChassis), + # **self.common_acl_params, + # ) + # vc_acl.full_clean() + + def test_alphanumeric_plus_fail(self): + """ + Test that AccessList names with non-alphanumeric (exluding '_' and '-') characters fail validation. + """ + non_alphanumeric_plus_chars = " !@#$%^&*()[]{};:,./<>?\|~=+" + + for i, char in enumerate(non_alphanumeric_plus_chars, start=1): + bad_acl_name = AccessList( + name=f"Testacl-bad_name_{i}_{char}", + assigned_object_type=ContentType.objects.get_for_model(Device), + comments=f'ACL with "{char}" in name', + **self.common_acl_params, + ) + with self.assertRaises(ValidationError): + bad_acl_name.full_clean() + + def test_duplicate_name_fail(self): + """ + Test that AccessList names must be unique per device. + """ + params = { + "name": "FAIL-DUPLICATE-ACL", + "assigned_object_type": ContentType.objects.get_for_model(Device), + **self.common_acl_params, + "assigned_object_id": 1, # TODO - replace with Device.objects.first() + } + acl_1 = AccessList.objects.create(**params) + acl_1.save() + acl_2 = AccessList(**params) + with self.assertRaises(ValidationError): + acl_2.full_clean() + # TODO: test_duplicate_name_fail - VirtualChassis & Cluster + + def test_valid_acl_choices(self): + """ + Test that AccessList action choices using VALID choices. + """ + valid_acl_default_action_choices = ["permit", "deny"] + valid_acl_types = ["standard", "extended"] + if len(valid_acl_default_action_choices) > len(valid_acl_types): + valid_acl_choices = list( + zip(valid_acl_default_action_choices, cycle(valid_acl_types)) + ) + elif len(valid_acl_default_action_choices) < len(valid_acl_types): + valid_acl_choices = list( + zip(cycle(valid_acl_default_action_choices), valid_acl_types) + ) + else: + valid_acl_choices = list( + zip(valid_acl_default_action_choices, valid_acl_types) + ) + + for default_action, acl_type in valid_acl_choices: + valid_acl_choice = AccessList( + name=f"TestACL_Valid_Choice_{default_action}_{acl_type}", + comments=f"VALID ACL CHOICES USED: {default_action=} {acl_type=}", + type=acl_type, + default_action=default_action, + assigned_object_type=ContentType.objects.get_for_model(Device), + assigned_object_id=1, # TODO - replace with Device.objects.first() + ) + valid_acl_choice.full_clean() + + def test_invalid_acl_choices(self): + """ + Test that AccessList action choices using INVALID choices. + """ + valid_acl_types = ["standard", "extended"] + invalid_acl_default_action_choice = "log" + invalid_acl_default_action = AccessList( + name=f"TestACL_Valid_Choice_{invalid_acl_default_action_choice}_{valid_acl_types[0]}", + comments=f"INVALID ACL DEFAULT CHOICE USED: default_action='{invalid_acl_default_action_choice}'", + type=valid_acl_types[0], + default_action=invalid_acl_default_action_choice, + assigned_object_type=ContentType.objects.get_for_model(Device), + assigned_object_id=1, # TODO - replace with Device.objects.first() + ) + with self.assertRaises(ValidationError): + invalid_acl_default_action.full_clean() + + valid_acl_default_action_choices = ["permit", "deny"] + invalid_acl_type = "super-dupper-extended" + invalid_acl_type = AccessList( + name=f"TestACL_Valid_Choice_{valid_acl_default_action_choices[0]}_{invalid_acl_type}", + comments=f"INVALID ACL DEFAULT CHOICE USED: type='{invalid_acl_type}'", + type=invalid_acl_type, + default_action=valid_acl_default_action_choices[0], + assigned_object_type=ContentType.objects.get_for_model(Device), + assigned_object_id=1, # TODO - replace with Device.objects.first() + ) + with self.assertRaises(ValidationError): + invalid_acl_type.full_clean() + + +class TestACLInterfaceAssignment(BaseTestCase): + """ + Test ACLInterfaceAssignment model. + """ + + @classmethod + def setUpTestData(cls): + """ + Extend BaseTestCase's setUpTestData() to create additional data for testing. + """ + super().setUpTestData() + device = Device.objects.first() + interfaces = Interface.objects.bulk_create( + ( + Interface(name="Interface 1", device=device, type="1000baset"), + Interface(name="Interface 2", device=device, type="1000baset"), + ) + ) + virtual_machine = VirtualMachine.objects.first() + vminterfaces = VMInterface.objects.bulk_create( + ( + VMInterface(name="Interface 1", virtual_machine=virtual_machine), + VMInterface(name="Interface 2", virtual_machine=virtual_machine), + ) + ) + #prefixes = Prefix.objects.bulk_create( + # ( + # Prefix(prefix=IPNetwork("10.0.0.0/24")), + # Prefix(prefix=IPNetwork("192.168.1.0/24")), + # ) + #) + + def test_acl_interface_assignment_success(self): + """ + Test that ACLInterfaceAssignment passes validation if the ACL is assigned to the host and not already assigned to the interface and direction. + """ + device_acl = AccessList( + name="STANDARD_ACL", + comments="STANDARD_ACL", + type="standard", + default_action="permit", + assigned_object=Device.objects.first(), + ) + device_acl.save() + acl_device_interface = ACLInterfaceAssignment( + access_list=device_acl, + direction="ingress", + assigned_object=Interface.objects.first(), + ) + acl_device_interface.full_clean() + + def test_aclinterface_assignment_fail(self): + """ + Test that ACLInterfaceAssignment passes validation if the ACL is assigned to the host and not already assigned to the vminterface and direction. + """ + device_acl = AccessList( + name="STANDARD_ACL", + comments="STANDARD_ACL", + type="standard", + default_action="permit", + assigned_object=Device.objects.first(), + ) + device_acl.save() + acl_vm_interface = ACLInterfaceAssignment( + access_list=device_acl, + direction="ingress", + assigned_object=VMInterface.objects.first(), + ) + with self.assertRaises(ValidationError): + acl_vm_interface.full_clean() + + def test_acl_vminterface_assignment_success(self): + """ + Test that ACLInterfaceAssignment passes validation if the ACL is assigned to the host and not already assigned to the vminterface and direction. + """ + vm_acl = AccessList( + name="STANDARD_ACL", + comments="STANDARD_ACL", + type="standard", + default_action="permit", + assigned_object_id=1, + assigned_object_type=ContentType.objects.get_for_model(VirtualMachine), + ) + vm_acl.save() + acl_vm_interface = ACLInterfaceAssignment( + access_list=vm_acl, + direction="ingress", + assigned_object_id=1, + assigned_object_type=ContentType.objects.get_for_model(VMInterface), + ) + acl_vm_interface.full_clean() + + def test_acl_interface_assignment_fail(self): + """ + Test that ACLInterfaceAssignment fails validation if the ACL is not assigned to the parent host. + """ + pass + # TODO: test_acl_interface_assignment_fail - VM & Device + + def test_duplicate_assignment_fail(self): + """ + Test that ACLInterfaceAssignment fails validation if the ACL already is assigned to the same interface and direction. + """ + pass + # TODO: test_duplicate_assignment_fail - VM & Device + + def test_acl_already_assinged_fail(self): + """ + Test that ACLInterfaceAssignment fails validation if the interface already has an ACL assigned in the same direction. + """ + pass + ## TODO: test_acl_already_assinged_fail - VM & Device + + # TODO: Test choices for ACLInterfaceAssignment Model + + +# TODO: Investigate a Base model for ACLStandardRule and ACLExtendedRule + + +class TestACLStandardRule(BaseTestCase): + """ + Test ACLStandardRule model. + """ + + # TODO: Develop tests for ACLStandardRule model + + +class ACLExtendedRule(BaseTestCase): + """ + Test ACLExtendedRule model. + """ + + # TODO: Develop tests for ACLExtendedRule model diff --git a/test.sh b/test.sh index 5fc2e063..7f167232 100755 --- a/test.sh +++ b/test.sh @@ -17,7 +17,7 @@ doco="docker compose --file docker-compose.yml" test_netbox_unit_tests() { echo "⏱ Running NetBox Unit Tests" $doco run --rm netbox python manage.py makemigrations netbox_acls --check - $doco run --rm netbox python manage.py test netbox_acls + $doco run --rm netbox python manage.py test netbox_acls -v 2 } test_cleanup() {