From 0df90b3f28cadb47256313cf3b2f6b8d0e417176 Mon Sep 17 00:00:00 2001 From: ryanmerolle Date: Thu, 2 Feb 2023 22:39:23 +0000 Subject: [PATCH 01/16] add draft test for plugin models --- netbox_acls/tests/test_models.py | 238 +++++++++++++++++++++++++++++++ 1 file changed, 238 insertions(+) create mode 100644 netbox_acls/tests/test_models.py diff --git a/netbox_acls/tests/test_models.py b/netbox_acls/tests/test_models.py new file mode 100644 index 00000000..8c50c788 --- /dev/null +++ b/netbox_acls/tests/test_models.py @@ -0,0 +1,238 @@ +from dcim.models import ( + Device, + DeviceRole, + DeviceType, + Interface, + Manufacturer, + Site, + VirtualChassis, +) +from django.contrib.contenttypes.models import ContentType +from django.core.exceptions import ValidationError +from django.test import TestCase +from ipam.models import Prefix +from netaddr import IPNetwork +from virtualization.models import Cluster, ClusterType, VirtualMachine, VMInterface + +from netbox_acls.choices import * +from netbox_acls.models import * + + +class BaseTestCase(TestCase): + """ + Base test case for netbox_acls models. + """ + + @classmethod + def setUpTestData(cls): + """ + Create base data to test using including: + - 1 of each of the following: test site, manufacturer, device type, device role, cluster type, cluster, virtual_chassis, & virtual machine + - 2 devices, prefixes, 2 interfaces, and 2 vminterfaces + """ + + site = Site.objects.create(name="Site 1", slug="site-1") + manufacturer = Manufacturer.objects.create( + name="Manufacturer 1", + slug="manufacturer-1", + ) + devicetype = DeviceType.objects.create( + manufacturer=manufacturer, + model="Device Type 1", + ) + devicerole = DeviceRole.objects.create( + name="Device Role 1", + slug="device-role-1", + ) + device = Device.objects.create( + name="Device 1", + site=site, + device_type=devicetype, + device_role=devicerole, + ) + virtual_chassis = VirtualChassis.objects.create(name="Virtual Chassis 1") + virtual_chassis_member = Device.objects.create( + name="VC Device", + site=site, + device_type=devicetype, + device_role=devicerole, + virtual_chassis=virtual_chassis, + vc_position=1, + ) + cluster_member = Device.objects.create( + name="Cluster Device", + site=site, + device_type=devicetype, + device_role=devicerole, + ) + clustertype = ClusterType.objects.create(name="Cluster Type 1") + cluster = Cluster.objects.create( + name="Cluster 1", + type=clustertype, + ) + virtual_machine = VirtualMachine.objects.create(name="VirtualMachine 1") + + +class TestAccessList(BaseTestCase): + """ + Test AccessList model. + """ + + def test_alphanumeric_plus_success(self): + """ + Test that AccessList names with alphanumeric characters, '_', or '-' pass validation. + """ + acl_good_name = AccessList( + name="Testacl-good_name-1", + assigned_object_type=ContentType.objects.get_for_model(Device), + assigned_object_id=1, + type=ACLTypeChoices.TYPE_EXTENDED, + default_action=ACLActionChoices.ACTION_PERMIT, + ) + acl_good_name.full_clean() + # TODO: test_alphanumeric_plus_success - VirtualChassis & Cluster + + def test_duplicate_name_success(self): + """ + Test that AccessList names can be non-unique if associated to different devices. + """ + + params = { + "name": "GOOD-DUPLICATE-ACL", + "type": ACLTypeChoices.TYPE_STANDARD, + "default_action": ACLActionChoices.ACTION_PERMIT, + } + AccessList.objects.create( + **params, + assigned_object_type=ContentType.objects.get_for_model(Device), + assigned_object_id=1, + ) + vm_acl = AccessList( + **params, + assigned_object_type=ContentType.objects.get_for_model(VirtualMachine), + assigned_object_id=1, + ) + vm_acl.full_clean() + # TODO: test_duplicate_name_success - VirtualChassis & Cluster + #vc_acl = AccessList( + # **params, + # assigned_object_type=ContentType.objects.get_for_model(VirtualChassis), + # assigned_object_id=1, + #) + #vc_acl.full_clean() + + def test_alphanumeric_plus_fail(self): + """ + Test that AccessList names with non-alphanumeric (exluding '_' and '-') characters fail validation. + """ + non_alphanumeric_plus_chars = " !@#$%^&*()[]{};:,./<>?\|~=+" + + for i, char in enumerate(non_alphanumeric_plus_chars, start=1): + bad_acl_name = AccessList( + name=f"Testacl-bad_name_{i}_{char}", + assigned_object_type=ContentType.objects.get_for_model(Device), + assigned_object_id=1, + type=ACLTypeChoices.TYPE_EXTENDED, + default_action=ACLActionChoices.ACTION_PERMIT, + comments=f'ACL with "{char}" in name', + ) + with self.assertRaises(ValidationError): + bad_acl_name.full_clean() + + def test_duplicate_name_fail(self): + """ + Test that AccessList names must be unique per device. + """ + params = { + "name": "FAIL-DUPLICATE-ACL", + "assigned_object_type": ContentType.objects.get_for_model(Device), + "assigned_object_id": 1, + "type": ACLTypeChoices.TYPE_STANDARD, + "default_action": ACLActionChoices.ACTION_PERMIT, + } + acl_1 = AccessList.objects.create(**params) + acl_1.save() + acl_2 = AccessList(**params) + with self.assertRaises(ValidationError): + acl_2.full_clean() + # TODO: test_duplicate_name_fail - VirtualChassis & Cluster + + # TODO: Test choices for AccessList Model + + +class TestACLInterfaceAssignment(BaseTestCase): + """ + Test ACLInterfaceAssignment model. + """ + + @classmethod + def setUpTestData(cls): + """ + Extend BaseTestCase's setUpTestData() to create additional data for testing. + """ + super().setUpTestData() + + #interfaces = Interface.objects.bulk_create( + # ( + # Interface(name="Interface 1", device=device, type="1000baset"), + # Interface(name="Interface 2", device=device, type="1000baset"), + # ) + #) + #vminterfaces = VMInterface.objects.bulk_create( + # ( + # VMInterface(name="Interface 1", virtual_machine=virtual_machine), + # VMInterface(name="Interface 2", virtual_machine=virtual_machine), + # ) + #) + # prefixes = Prefix.objects.bulk_create( + # ( + # Prefix(prefix=IPNetwork("10.0.0.0/24")), + # Prefix(prefix=IPNetwork("192.168.1.0/24")), + # ) + # ) + + def test_acl_interface_assignment_success(self): + """ + Test that ACLInterfaceAssignment passes validation if the ACL is assigned to the host and not already assigned to the interface and direction. + """ + pass + # TODO: test_acl_interface_assignment_success - VM & Device + + def test_acl_interface_assignment_fail(self): + """ + Test that ACLInterfaceAssignment fails validation if the ACL is not assigned to the parent host. + """ + pass + # TODO: test_acl_interface_assignment_fail - VM & Device + + def test_duplicate_assignment_fail(self): + """ + Test that ACLInterfaceAssignment fails validation if the ACL already is assigned to the same interface and direction. + """ + pass + # TODO: test_duplicate_assignment_fail - VM & Device + + def test_acl_already_assinged_fail(self): + """ + Test that ACLInterfaceAssignment fails validation if the interface already has an ACL assigned in the same direction. + """ + pass + ## TODO: test_acl_already_assinged_fail - VM & Device + + # TODO: Test choices for ACLInterfaceAssignment Model + + +# TODO: Investigate a Base model for ACLStandardRule and ACLExtendedRule + +class TestACLStandardRule(BaseTestCase): + """ + Test ACLStandardRule model. + """ + # TODO: Develop tests for ACLStandardRule model + + +class ACLExtendedRule(BaseTestCase): + """ + Test ACLExtendedRule model. + """ + # TODO: Develop tests for ACLExtendedRule model From 0cd43ef8e6c62113903fe1d4439ce335ff12653c Mon Sep 17 00:00:00 2001 From: ryanmerolle Date: Fri, 3 Feb 2023 14:38:28 +0000 Subject: [PATCH 02/16] minimize duplicate code in model tests --- netbox_acls/tests/test_models.py | 65 ++++++++++++++++++++------------ 1 file changed, 41 insertions(+), 24 deletions(-) diff --git a/netbox_acls/tests/test_models.py b/netbox_acls/tests/test_models.py index 8c50c788..5e3af74a 100644 --- a/netbox_acls/tests/test_models.py +++ b/netbox_acls/tests/test_models.py @@ -71,6 +71,7 @@ def setUpTestData(cls): type=clustertype, ) virtual_machine = VirtualMachine.objects.create(name="VirtualMachine 1") + prefix = Prefix.objects.create(prefix="10.0.0.0/8") class TestAccessList(BaseTestCase): @@ -78,19 +79,35 @@ class TestAccessList(BaseTestCase): Test AccessList model. """ + common_acl_params = { + "assigned_object_id": 1, + "type": ACLTypeChoices.TYPE_EXTENDED, + "default_action": ACLActionChoices.ACTION_PERMIT, + } + + def test_wrong_assigned_object_type_fail(self): + """ + Test that AccessList cannot be assigned to an object type other than Device, VirtualChassis, VirtualMachine, or Cluster. + """ + acl_bad_gfk = AccessList( + name="TestACL_Wrong_GFK", + assigned_object_type=ContentType.objects.get_for_model(Prefix), + **self.common_acl_params, + ) + with self.assertRaises(ValidationError): + acl_bad_gfk.full_clean() + def test_alphanumeric_plus_success(self): """ Test that AccessList names with alphanumeric characters, '_', or '-' pass validation. """ acl_good_name = AccessList( - name="Testacl-good_name-1", + name="Testacl-Good_Name-1", assigned_object_type=ContentType.objects.get_for_model(Device), - assigned_object_id=1, - type=ACLTypeChoices.TYPE_EXTENDED, - default_action=ACLActionChoices.ACTION_PERMIT, + **self.common_acl_params, ) acl_good_name.full_clean() - # TODO: test_alphanumeric_plus_success - VirtualChassis & Cluster + # TODO: test_alphanumeric_plus_success - VirtualChassis, VirtualMachine & Cluster def test_duplicate_name_success(self): """ @@ -103,23 +120,23 @@ def test_duplicate_name_success(self): "default_action": ACLActionChoices.ACTION_PERMIT, } AccessList.objects.create( - **params, + name="GOOD-DUPLICATE-ACL", assigned_object_type=ContentType.objects.get_for_model(Device), - assigned_object_id=1, + **self.common_acl_params, ) vm_acl = AccessList( - **params, + name="GOOD-DUPLICATE-ACL", assigned_object_type=ContentType.objects.get_for_model(VirtualMachine), - assigned_object_id=1, + **self.common_acl_params, ) vm_acl.full_clean() - # TODO: test_duplicate_name_success - VirtualChassis & Cluster - #vc_acl = AccessList( - # **params, + # TODO: test_duplicate_name_success - VirtualChassis, VirtualMachine & Cluster + # vc_acl = AccessList( + # "name": "GOOD-DUPLICATE-ACL", # assigned_object_type=ContentType.objects.get_for_model(VirtualChassis), - # assigned_object_id=1, - #) - #vc_acl.full_clean() + # **self.common_acl_params, + # ) + # vc_acl.full_clean() def test_alphanumeric_plus_fail(self): """ @@ -131,10 +148,8 @@ def test_alphanumeric_plus_fail(self): bad_acl_name = AccessList( name=f"Testacl-bad_name_{i}_{char}", assigned_object_type=ContentType.objects.get_for_model(Device), - assigned_object_id=1, - type=ACLTypeChoices.TYPE_EXTENDED, - default_action=ACLActionChoices.ACTION_PERMIT, comments=f'ACL with "{char}" in name', + **self.common_acl_params, ) with self.assertRaises(ValidationError): bad_acl_name.full_clean() @@ -146,9 +161,8 @@ def test_duplicate_name_fail(self): params = { "name": "FAIL-DUPLICATE-ACL", "assigned_object_type": ContentType.objects.get_for_model(Device), + **self.common_acl_params, "assigned_object_id": 1, - "type": ACLTypeChoices.TYPE_STANDARD, - "default_action": ACLActionChoices.ACTION_PERMIT, } acl_1 = AccessList.objects.create(**params) acl_1.save() @@ -172,18 +186,18 @@ def setUpTestData(cls): """ super().setUpTestData() - #interfaces = Interface.objects.bulk_create( + # interfaces = Interface.objects.bulk_create( # ( # Interface(name="Interface 1", device=device, type="1000baset"), # Interface(name="Interface 2", device=device, type="1000baset"), # ) - #) - #vminterfaces = VMInterface.objects.bulk_create( + # ) + # vminterfaces = VMInterface.objects.bulk_create( # ( # VMInterface(name="Interface 1", virtual_machine=virtual_machine), # VMInterface(name="Interface 2", virtual_machine=virtual_machine), # ) - #) + # ) # prefixes = Prefix.objects.bulk_create( # ( # Prefix(prefix=IPNetwork("10.0.0.0/24")), @@ -224,10 +238,12 @@ def test_acl_already_assinged_fail(self): # TODO: Investigate a Base model for ACLStandardRule and ACLExtendedRule + class TestACLStandardRule(BaseTestCase): """ Test ACLStandardRule model. """ + # TODO: Develop tests for ACLStandardRule model @@ -235,4 +251,5 @@ class ACLExtendedRule(BaseTestCase): """ Test ACLExtendedRule model. """ + # TODO: Develop tests for ACLExtendedRule model From 06db83a94b39078d403fe9755b51c9461b2ef67a Mon Sep 17 00:00:00 2001 From: ryanmerolle Date: Fri, 3 Feb 2023 22:16:30 +0000 Subject: [PATCH 03/16] build acl models tests --- netbox_acls/tests/test_models.py | 72 ++++++++++++++++++++++++++++---- 1 file changed, 63 insertions(+), 9 deletions(-) diff --git a/netbox_acls/tests/test_models.py b/netbox_acls/tests/test_models.py index 5e3af74a..d7dc2ac7 100644 --- a/netbox_acls/tests/test_models.py +++ b/netbox_acls/tests/test_models.py @@ -1,3 +1,5 @@ +from itertools import cycle + from dcim.models import ( Device, DeviceRole, @@ -14,7 +16,6 @@ from netaddr import IPNetwork from virtualization.models import Cluster, ClusterType, VirtualMachine, VMInterface -from netbox_acls.choices import * from netbox_acls.models import * @@ -81,8 +82,8 @@ class TestAccessList(BaseTestCase): common_acl_params = { "assigned_object_id": 1, - "type": ACLTypeChoices.TYPE_EXTENDED, - "default_action": ACLActionChoices.ACTION_PERMIT, + "type": "extended", + "default_action": "permit", } def test_wrong_assigned_object_type_fail(self): @@ -114,11 +115,6 @@ def test_duplicate_name_success(self): Test that AccessList names can be non-unique if associated to different devices. """ - params = { - "name": "GOOD-DUPLICATE-ACL", - "type": ACLTypeChoices.TYPE_STANDARD, - "default_action": ACLActionChoices.ACTION_PERMIT, - } AccessList.objects.create( name="GOOD-DUPLICATE-ACL", assigned_object_type=ContentType.objects.get_for_model(Device), @@ -171,7 +167,65 @@ def test_duplicate_name_fail(self): acl_2.full_clean() # TODO: test_duplicate_name_fail - VirtualChassis & Cluster - # TODO: Test choices for AccessList Model + def test_valid_acl_choices(self): + """ + Test that AccessList action choices using VALID choices. + """ + valid_acl_default_action_choices = ["permit", "deny"] + valid_acl_types = ["standard", "extended"] + if len(valid_acl_default_action_choices) > len(valid_acl_types): + valid_acl_choices = list( + zip(valid_acl_default_action_choices, cycle(valid_acl_types)) + ) + elif len(valid_acl_default_action_choices) < len(valid_acl_types): + valid_acl_choices = list( + zip(cycle(valid_acl_default_action_choices), valid_acl_types) + ) + else: + valid_acl_choices = list( + zip(valid_acl_default_action_choices, valid_acl_types) + ) + + for default_action, acl_type in valid_acl_choices: + valid_acl_choice = AccessList( + name=f"TestACL_Valid_Choice_{default_action}_{acl_type}", + comments=f"VALID ACL CHOICES USED: {default_action=} {acl_type=}", + type=acl_type, + default_action=default_action, + assigned_object_type=ContentType.objects.get_for_model(Device), + assigned_object_id=1, + ) + valid_acl_choice.full_clean() + + def test_invalid_acl_choices(self): + """ + Test that AccessList action choices using INVALID choices. + """ + valid_acl_types = ["standard", "extended"] + invalid_acl_default_action_choice = "log" + invalid_acl_default_action = AccessList( + name=f"TestACL_Valid_Choice_{invalid_acl_default_action_choice}_{valid_acl_types[0]}", + comments=f"INVALID ACL DEFAULT CHOICE USED: default_action='{invalid_acl_default_action_choice}'", + type=valid_acl_types[0], + default_action=invalid_acl_default_action_choice, + assigned_object_type=ContentType.objects.get_for_model(Device), + assigned_object_id=1, + ) + with self.assertRaises(ValidationError): + invalid_acl_default_action.full_clean() + + valid_acl_default_action_choices = ["permit", "deny"] + invalid_acl_type = "super-dupper-extended" + invalid_acl_type = AccessList( + name=f"TestACL_Valid_Choice_{valid_acl_default_action_choices[0]}_{invalid_acl_type}", + comments=f"INVALID ACL DEFAULT CHOICE USED: type='{invalid_acl_type}'", + type=invalid_acl_type, + default_action=valid_acl_default_action_choices[0], + assigned_object_type=ContentType.objects.get_for_model(Device), + assigned_object_id=1, + ) + with self.assertRaises(ValidationError): + invalid_acl_type.full_clean() class TestACLInterfaceAssignment(BaseTestCase): From e2f33d73037b72c8d103568beaff18185e7711f0 Mon Sep 17 00:00:00 2001 From: ryanmerolle Date: Sun, 5 Feb 2023 13:47:32 +0000 Subject: [PATCH 04/16] test ACLInterfaceAssignment draft --- netbox_acls/tests/test_models.py | 93 ++++++++++++++++++++++---------- 1 file changed, 66 insertions(+), 27 deletions(-) diff --git a/netbox_acls/tests/test_models.py b/netbox_acls/tests/test_models.py index d7dc2ac7..945788d5 100644 --- a/netbox_acls/tests/test_models.py +++ b/netbox_acls/tests/test_models.py @@ -81,10 +81,10 @@ class TestAccessList(BaseTestCase): """ common_acl_params = { - "assigned_object_id": 1, "type": "extended", "default_action": "permit", } + # device = Device.objects.first() def test_wrong_assigned_object_type_fail(self): """ @@ -93,6 +93,7 @@ def test_wrong_assigned_object_type_fail(self): acl_bad_gfk = AccessList( name="TestACL_Wrong_GFK", assigned_object_type=ContentType.objects.get_for_model(Prefix), + assigned_object_id=Prefix.objects.first(), **self.common_acl_params, ) with self.assertRaises(ValidationError): @@ -105,6 +106,7 @@ def test_alphanumeric_plus_success(self): acl_good_name = AccessList( name="Testacl-Good_Name-1", assigned_object_type=ContentType.objects.get_for_model(Device), + assigned_object_id=1, # TODO - replace with Device.objects.first() **self.common_acl_params, ) acl_good_name.full_clean() @@ -114,15 +116,16 @@ def test_duplicate_name_success(self): """ Test that AccessList names can be non-unique if associated to different devices. """ - AccessList.objects.create( name="GOOD-DUPLICATE-ACL", assigned_object_type=ContentType.objects.get_for_model(Device), + assigned_object_id=1, # TODO - replace with Device.objects.first() **self.common_acl_params, ) vm_acl = AccessList( name="GOOD-DUPLICATE-ACL", assigned_object_type=ContentType.objects.get_for_model(VirtualMachine), + assigned_object_id=1, # TODO - replace with VirtualMachine.objects.first().id, **self.common_acl_params, ) vm_acl.full_clean() @@ -158,7 +161,7 @@ def test_duplicate_name_fail(self): "name": "FAIL-DUPLICATE-ACL", "assigned_object_type": ContentType.objects.get_for_model(Device), **self.common_acl_params, - "assigned_object_id": 1, + "assigned_object_id": 1, # TODO - replace with Device.objects.first() } acl_1 = AccessList.objects.create(**params) acl_1.save() @@ -193,7 +196,7 @@ def test_valid_acl_choices(self): type=acl_type, default_action=default_action, assigned_object_type=ContentType.objects.get_for_model(Device), - assigned_object_id=1, + assigned_object_id=1, # TODO - replace with Device.objects.first() ) valid_acl_choice.full_clean() @@ -209,7 +212,7 @@ def test_invalid_acl_choices(self): type=valid_acl_types[0], default_action=invalid_acl_default_action_choice, assigned_object_type=ContentType.objects.get_for_model(Device), - assigned_object_id=1, + assigned_object_id=1, # TODO - replace with Device.objects.first() ) with self.assertRaises(ValidationError): invalid_acl_default_action.full_clean() @@ -222,7 +225,7 @@ def test_invalid_acl_choices(self): type=invalid_acl_type, default_action=valid_acl_default_action_choices[0], assigned_object_type=ContentType.objects.get_for_model(Device), - assigned_object_id=1, + assigned_object_id=1, # TODO - replace with Device.objects.first() ) with self.assertRaises(ValidationError): invalid_acl_type.full_clean() @@ -239,32 +242,68 @@ def setUpTestData(cls): Extend BaseTestCase's setUpTestData() to create additional data for testing. """ super().setUpTestData() - - # interfaces = Interface.objects.bulk_create( - # ( - # Interface(name="Interface 1", device=device, type="1000baset"), - # Interface(name="Interface 2", device=device, type="1000baset"), - # ) - # ) - # vminterfaces = VMInterface.objects.bulk_create( - # ( - # VMInterface(name="Interface 1", virtual_machine=virtual_machine), - # VMInterface(name="Interface 2", virtual_machine=virtual_machine), - # ) - # ) - # prefixes = Prefix.objects.bulk_create( - # ( - # Prefix(prefix=IPNetwork("10.0.0.0/24")), - # Prefix(prefix=IPNetwork("192.168.1.0/24")), - # ) - # ) + device = Device.objects.first() + interfaces = Interface.objects.bulk_create( + ( + Interface(name="Interface 1", device=device, type="1000baset"), + Interface(name="Interface 2", device=device, type="1000baset"), + ) + ) + virtual_machine = VirtualMachine.objects.first() + vminterfaces = VMInterface.objects.bulk_create( + ( + VMInterface(name="Interface 1", virtual_machine=virtual_machine), + VMInterface(name="Interface 2", virtual_machine=virtual_machine), + ) + ) + prefixes = Prefix.objects.bulk_create( + ( + Prefix(prefix=IPNetwork("10.0.0.0/24")), + Prefix(prefix=IPNetwork("192.168.1.0/24")), + ) + ) def test_acl_interface_assignment_success(self): """ Test that ACLInterfaceAssignment passes validation if the ACL is assigned to the host and not already assigned to the interface and direction. """ - pass - # TODO: test_acl_interface_assignment_success - VM & Device + device_acl = AccessList( + name="STANDARD_ACL", + comments="STANDARD_ACL", + type="standard", + default_action="permit", + assigned_object_id=1, + assigned_object_type=ContentType.objects.get_for_model(Device), + ) + device_acl.save() + acl_device_interface = ACLInterfaceAssignment( + access_list_id=device_acl.pk, + direction="ingress", + assigned_object_id=1, + assigned_object_type=ContentType.objects.get_for_model(Interface), + ) + acl_device_interface.full_clean() + + def test_acl_vminterface_assignment_success(self): + """ + Test that ACLInterfaceAssignment passes validation if the ACL is assigned to the host and not already assigned to the vminterface and direction. + """ + vm_acl = AccessList( + name="STANDARD_ACL", + comments="STANDARD_ACL", + type="standard", + default_action="permit", + assigned_object_id=1, + assigned_object_type=ContentType.objects.get_for_model(VirtualMachine), + ) + vm_acl.save() + acl_vm_interface = ACLInterfaceAssignment( + access_list=vm_acl.pk, + direction="ingress", + assigned_object_id=1, + assigned_object_type=ContentType.objects.get_for_model(VMInterface), + ) + acl_vm_interface.full_clean() def test_acl_interface_assignment_fail(self): """ From c79433cdc84e2f32a478898155fd62950ecc8dca Mon Sep 17 00:00:00 2001 From: Abhimanyu Saharan Date: Sun, 5 Feb 2023 20:27:56 +0530 Subject: [PATCH 05/16] fixed typo --- netbox_acls/tests/test_models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/netbox_acls/tests/test_models.py b/netbox_acls/tests/test_models.py index 945788d5..bfe8c967 100644 --- a/netbox_acls/tests/test_models.py +++ b/netbox_acls/tests/test_models.py @@ -298,7 +298,7 @@ def test_acl_vminterface_assignment_success(self): ) vm_acl.save() acl_vm_interface = ACLInterfaceAssignment( - access_list=vm_acl.pk, + access_list=vm_acl, direction="ingress", assigned_object_id=1, assigned_object_type=ContentType.objects.get_for_model(VMInterface), From 0194437d4fb1596f9f5d220988b50c0d232a1595 Mon Sep 17 00:00:00 2001 From: ryanmerolle Date: Sun, 5 Feb 2023 17:26:42 +0000 Subject: [PATCH 06/16] correct acl key --- netbox_acls/tests/test_models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/netbox_acls/tests/test_models.py b/netbox_acls/tests/test_models.py index bfe8c967..76fd7015 100644 --- a/netbox_acls/tests/test_models.py +++ b/netbox_acls/tests/test_models.py @@ -277,7 +277,7 @@ def test_acl_interface_assignment_success(self): ) device_acl.save() acl_device_interface = ACLInterfaceAssignment( - access_list_id=device_acl.pk, + access_list=device_acl, direction="ingress", assigned_object_id=1, assigned_object_type=ContentType.objects.get_for_model(Interface), From 8f73a312d9091023efc5baef26d9ea00950fd238 Mon Sep 17 00:00:00 2001 From: ryanmerolle Date: Tue, 7 Feb 2023 05:09:59 +0000 Subject: [PATCH 07/16] draft host logic model --- netbox_acls/models/access_lists.py | 17 +++++++ netbox_acls/tests/test_models.py | 79 ++++++++++++++++++------------ 2 files changed, 66 insertions(+), 30 deletions(-) diff --git a/netbox_acls/models/access_lists.py b/netbox_acls/models/access_lists.py index 39a55f4a..76031fbb 100644 --- a/netbox_acls/models/access_lists.py +++ b/netbox_acls/models/access_lists.py @@ -5,6 +5,7 @@ from dcim.models import Device, Interface, VirtualChassis from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation from django.contrib.contenttypes.models import ContentType +from django.core.exceptions import ValidationError from django.core.validators import RegexValidator from django.db import models from django.urls import reverse @@ -149,6 +150,22 @@ def get_absolute_url(self): args=[self.pk], ) + def clean(self): + super().clean() + + # Get the model type of the assigned interface. + if self.assigned_object_type.model_class() == VMInterface: + interface_host = self.assigned_object.virtual_machine + elif self.assigned_object_type.model_class() == Interface: + interface_host = self.assigned_object.device + # Check if the assigned interface's host is the same as the host assigned to the access list. + if interface_host != self.access_list.assigned_object: + raise ValidationError( + { + "assigned_object": "The assigned object must be the same as the device assigned to it." + } + ) + def get_direction_color(self): return ACLAssignmentDirectionChoices.colors.get(self.direction) diff --git a/netbox_acls/tests/test_models.py b/netbox_acls/tests/test_models.py index 76fd7015..8816b841 100644 --- a/netbox_acls/tests/test_models.py +++ b/netbox_acls/tests/test_models.py @@ -51,27 +51,28 @@ def setUpTestData(cls): device_type=devicetype, device_role=devicerole, ) - virtual_chassis = VirtualChassis.objects.create(name="Virtual Chassis 1") - virtual_chassis_member = Device.objects.create( - name="VC Device", - site=site, - device_type=devicetype, - device_role=devicerole, - virtual_chassis=virtual_chassis, - vc_position=1, - ) - cluster_member = Device.objects.create( - name="Cluster Device", - site=site, - device_type=devicetype, - device_role=devicerole, - ) - clustertype = ClusterType.objects.create(name="Cluster Type 1") - cluster = Cluster.objects.create( - name="Cluster 1", - type=clustertype, - ) + # virtual_chassis = VirtualChassis.objects.create(name="Virtual Chassis 1") + # virtual_chassis_member = Device.objects.create( + # name="VC Device", + # site=site, + # device_type=devicetype, + # device_role=devicerole, + # virtual_chassis=virtual_chassis, + # vc_position=1, + # ) + # cluster_member = Device.objects.create( + # name="Cluster Device", + # site=site, + # device_type=devicetype, + # device_role=devicerole, + # ) + # clustertype = ClusterType.objects.create(name="Cluster Type 1") + # cluster = Cluster.objects.create( + # name="Cluster 1", + # type=clustertype, + # ) virtual_machine = VirtualMachine.objects.create(name="VirtualMachine 1") + virtual_machine.save() prefix = Prefix.objects.create(prefix="10.0.0.0/8") @@ -256,12 +257,12 @@ def setUpTestData(cls): VMInterface(name="Interface 2", virtual_machine=virtual_machine), ) ) - prefixes = Prefix.objects.bulk_create( - ( - Prefix(prefix=IPNetwork("10.0.0.0/24")), - Prefix(prefix=IPNetwork("192.168.1.0/24")), - ) - ) + #prefixes = Prefix.objects.bulk_create( + # ( + # Prefix(prefix=IPNetwork("10.0.0.0/24")), + # Prefix(prefix=IPNetwork("192.168.1.0/24")), + # ) + #) def test_acl_interface_assignment_success(self): """ @@ -272,18 +273,36 @@ def test_acl_interface_assignment_success(self): comments="STANDARD_ACL", type="standard", default_action="permit", - assigned_object_id=1, - assigned_object_type=ContentType.objects.get_for_model(Device), + assigned_object=Device.objects.first(), ) device_acl.save() acl_device_interface = ACLInterfaceAssignment( access_list=device_acl, direction="ingress", - assigned_object_id=1, - assigned_object_type=ContentType.objects.get_for_model(Interface), + assigned_object=Interface.objects.first(), ) acl_device_interface.full_clean() + def test_aclinterface_assignment_fail(self): + """ + Test that ACLInterfaceAssignment passes validation if the ACL is assigned to the host and not already assigned to the vminterface and direction. + """ + device_acl = AccessList( + name="STANDARD_ACL", + comments="STANDARD_ACL", + type="standard", + default_action="permit", + assigned_object=Device.objects.first(), + ) + device_acl.save() + acl_vm_interface = ACLInterfaceAssignment( + access_list=device_acl, + direction="ingress", + assigned_object=VMInterface.objects.first(), + ) + with self.assertRaises(ValidationError): + acl_vm_interface.full_clean() + def test_acl_vminterface_assignment_success(self): """ Test that ACLInterfaceAssignment passes validation if the ACL is assigned to the host and not already assigned to the vminterface and direction. From c5418f6da1a17c00836e85e70448ecc44233c95e Mon Sep 17 00:00:00 2001 From: Martin Hauser Date: Sun, 23 Mar 2025 12:29:12 +0100 Subject: [PATCH 08/16] chore(tests): move model tests to dedicated directory Renames `test_models.py` to `models/test_accesslists.py` to improve test organization and maintainability. This is a structural change with no modifications to test logic. --- netbox_acls/tests/models/__init__.py | 0 .../test_accesslists.py} | 16 +++++----------- 2 files changed, 5 insertions(+), 11 deletions(-) create mode 100644 netbox_acls/tests/models/__init__.py rename netbox_acls/tests/{test_models.py => models/test_accesslists.py} (96%) diff --git a/netbox_acls/tests/models/__init__.py b/netbox_acls/tests/models/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/netbox_acls/tests/test_models.py b/netbox_acls/tests/models/test_accesslists.py similarity index 96% rename from netbox_acls/tests/test_models.py rename to netbox_acls/tests/models/test_accesslists.py index 8816b841..64efa403 100644 --- a/netbox_acls/tests/test_models.py +++ b/netbox_acls/tests/models/test_accesslists.py @@ -178,17 +178,11 @@ def test_valid_acl_choices(self): valid_acl_default_action_choices = ["permit", "deny"] valid_acl_types = ["standard", "extended"] if len(valid_acl_default_action_choices) > len(valid_acl_types): - valid_acl_choices = list( - zip(valid_acl_default_action_choices, cycle(valid_acl_types)) - ) + valid_acl_choices = list(zip(valid_acl_default_action_choices, cycle(valid_acl_types))) elif len(valid_acl_default_action_choices) < len(valid_acl_types): - valid_acl_choices = list( - zip(cycle(valid_acl_default_action_choices), valid_acl_types) - ) + valid_acl_choices = list(zip(cycle(valid_acl_default_action_choices), valid_acl_types)) else: - valid_acl_choices = list( - zip(valid_acl_default_action_choices, valid_acl_types) - ) + valid_acl_choices = list(zip(valid_acl_default_action_choices, valid_acl_types)) for default_action, acl_type in valid_acl_choices: valid_acl_choice = AccessList( @@ -257,12 +251,12 @@ def setUpTestData(cls): VMInterface(name="Interface 2", virtual_machine=virtual_machine), ) ) - #prefixes = Prefix.objects.bulk_create( + # prefixes = Prefix.objects.bulk_create( # ( # Prefix(prefix=IPNetwork("10.0.0.0/24")), # Prefix(prefix=IPNetwork("192.168.1.0/24")), # ) - #) + # ) def test_acl_interface_assignment_success(self): """ From 8cde4f8aebb0a32ce59fcf4a395282d73587bc43 Mon Sep 17 00:00:00 2001 From: Martin Hauser Date: Sun, 23 Mar 2025 14:10:09 +0100 Subject: [PATCH 09/16] chore(tests): Split model tests into separate files --- netbox_acls/tests/models/base.py | 69 ++++++ netbox_acls/tests/models/test_accesslists.py | 212 +----------------- .../models/test_aclinterfaceassignments.py | 129 +++++++++++ .../tests/models/test_extendedrules.py | 9 + .../tests/models/test_standardrules.py | 9 + 5 files changed, 222 insertions(+), 206 deletions(-) create mode 100644 netbox_acls/tests/models/base.py create mode 100644 netbox_acls/tests/models/test_aclinterfaceassignments.py create mode 100644 netbox_acls/tests/models/test_extendedrules.py create mode 100644 netbox_acls/tests/models/test_standardrules.py diff --git a/netbox_acls/tests/models/base.py b/netbox_acls/tests/models/base.py new file mode 100644 index 00000000..a9e5bdc3 --- /dev/null +++ b/netbox_acls/tests/models/base.py @@ -0,0 +1,69 @@ +from dcim.models import ( + Device, + DeviceRole, + DeviceType, + Manufacturer, + Site, +) +from django.test import TestCase +from ipam.models import Prefix +from virtualization.models import VirtualMachine + + +class BaseTestCase(TestCase): + """ + Base test case for netbox_acls models. + """ + + @classmethod + def setUpTestData(cls): + """ + Create base data to test using including + - 1 of each of the following: test site, manufacturer, device type + device role, cluster type, cluster, virtual_chassis, and + virtual machine + - 2 devices, prefixes, 2 interfaces, and 2 vminterfaces + """ + + site = Site.objects.create(name="Site 1", slug="site-1") + manufacturer = Manufacturer.objects.create( + name="Manufacturer 1", + slug="manufacturer-1", + ) + devicetype = DeviceType.objects.create( + manufacturer=manufacturer, + model="Device Type 1", + ) + devicerole = DeviceRole.objects.create( + name="Device Role 1", + slug="device-role-1", + ) + device = Device.objects.create( + name="Device 1", + site=site, + device_type=devicetype, + device_role=devicerole, + ) + # virtual_chassis = VirtualChassis.objects.create(name="Virtual Chassis 1") + # virtual_chassis_member = Device.objects.create( + # name="VC Device", + # site=site, + # device_type=devicetype, + # device_role=devicerole, + # virtual_chassis=virtual_chassis, + # vc_position=1, + # ) + # cluster_member = Device.objects.create( + # name="Cluster Device", + # site=site, + # device_type=devicetype, + # device_role=devicerole, + # ) + # clustertype = ClusterType.objects.create(name="Cluster Type 1") + # cluster = Cluster.objects.create( + # name="Cluster 1", + # type=clustertype, + # ) + virtual_machine = VirtualMachine.objects.create(name="VirtualMachine 1") + virtual_machine.save() + prefix = Prefix.objects.create(prefix="10.0.0.0/8") diff --git a/netbox_acls/tests/models/test_accesslists.py b/netbox_acls/tests/models/test_accesslists.py index 64efa403..9f913408 100644 --- a/netbox_acls/tests/models/test_accesslists.py +++ b/netbox_acls/tests/models/test_accesslists.py @@ -1,79 +1,14 @@ from itertools import cycle -from dcim.models import ( - Device, - DeviceRole, - DeviceType, - Interface, - Manufacturer, - Site, - VirtualChassis, -) +from dcim.models import Device from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ValidationError -from django.test import TestCase from ipam.models import Prefix -from netaddr import IPNetwork -from virtualization.models import Cluster, ClusterType, VirtualMachine, VMInterface +from virtualization.models import VirtualMachine -from netbox_acls.models import * +from netbox_acls.models import AccessList - -class BaseTestCase(TestCase): - """ - Base test case for netbox_acls models. - """ - - @classmethod - def setUpTestData(cls): - """ - Create base data to test using including: - - 1 of each of the following: test site, manufacturer, device type, device role, cluster type, cluster, virtual_chassis, & virtual machine - - 2 devices, prefixes, 2 interfaces, and 2 vminterfaces - """ - - site = Site.objects.create(name="Site 1", slug="site-1") - manufacturer = Manufacturer.objects.create( - name="Manufacturer 1", - slug="manufacturer-1", - ) - devicetype = DeviceType.objects.create( - manufacturer=manufacturer, - model="Device Type 1", - ) - devicerole = DeviceRole.objects.create( - name="Device Role 1", - slug="device-role-1", - ) - device = Device.objects.create( - name="Device 1", - site=site, - device_type=devicetype, - device_role=devicerole, - ) - # virtual_chassis = VirtualChassis.objects.create(name="Virtual Chassis 1") - # virtual_chassis_member = Device.objects.create( - # name="VC Device", - # site=site, - # device_type=devicetype, - # device_role=devicerole, - # virtual_chassis=virtual_chassis, - # vc_position=1, - # ) - # cluster_member = Device.objects.create( - # name="Cluster Device", - # site=site, - # device_type=devicetype, - # device_role=devicerole, - # ) - # clustertype = ClusterType.objects.create(name="Cluster Type 1") - # cluster = Cluster.objects.create( - # name="Cluster 1", - # type=clustertype, - # ) - virtual_machine = VirtualMachine.objects.create(name="VirtualMachine 1") - virtual_machine.save() - prefix = Prefix.objects.create(prefix="10.0.0.0/8") +from .base import BaseTestCase class TestAccessList(BaseTestCase): @@ -115,7 +50,7 @@ def test_alphanumeric_plus_success(self): def test_duplicate_name_success(self): """ - Test that AccessList names can be non-unique if associated to different devices. + Test that AccessList names can be non-unique if associated with different devices. """ AccessList.objects.create( name="GOOD-DUPLICATE-ACL", @@ -140,7 +75,7 @@ def test_duplicate_name_success(self): def test_alphanumeric_plus_fail(self): """ - Test that AccessList names with non-alphanumeric (exluding '_' and '-') characters fail validation. + Test that AccessList names with non-alphanumeric (excluding '_' and '-') characters fail validation. """ non_alphanumeric_plus_chars = " !@#$%^&*()[]{};:,./<>?\|~=+" @@ -224,138 +159,3 @@ def test_invalid_acl_choices(self): ) with self.assertRaises(ValidationError): invalid_acl_type.full_clean() - - -class TestACLInterfaceAssignment(BaseTestCase): - """ - Test ACLInterfaceAssignment model. - """ - - @classmethod - def setUpTestData(cls): - """ - Extend BaseTestCase's setUpTestData() to create additional data for testing. - """ - super().setUpTestData() - device = Device.objects.first() - interfaces = Interface.objects.bulk_create( - ( - Interface(name="Interface 1", device=device, type="1000baset"), - Interface(name="Interface 2", device=device, type="1000baset"), - ) - ) - virtual_machine = VirtualMachine.objects.first() - vminterfaces = VMInterface.objects.bulk_create( - ( - VMInterface(name="Interface 1", virtual_machine=virtual_machine), - VMInterface(name="Interface 2", virtual_machine=virtual_machine), - ) - ) - # prefixes = Prefix.objects.bulk_create( - # ( - # Prefix(prefix=IPNetwork("10.0.0.0/24")), - # Prefix(prefix=IPNetwork("192.168.1.0/24")), - # ) - # ) - - def test_acl_interface_assignment_success(self): - """ - Test that ACLInterfaceAssignment passes validation if the ACL is assigned to the host and not already assigned to the interface and direction. - """ - device_acl = AccessList( - name="STANDARD_ACL", - comments="STANDARD_ACL", - type="standard", - default_action="permit", - assigned_object=Device.objects.first(), - ) - device_acl.save() - acl_device_interface = ACLInterfaceAssignment( - access_list=device_acl, - direction="ingress", - assigned_object=Interface.objects.first(), - ) - acl_device_interface.full_clean() - - def test_aclinterface_assignment_fail(self): - """ - Test that ACLInterfaceAssignment passes validation if the ACL is assigned to the host and not already assigned to the vminterface and direction. - """ - device_acl = AccessList( - name="STANDARD_ACL", - comments="STANDARD_ACL", - type="standard", - default_action="permit", - assigned_object=Device.objects.first(), - ) - device_acl.save() - acl_vm_interface = ACLInterfaceAssignment( - access_list=device_acl, - direction="ingress", - assigned_object=VMInterface.objects.first(), - ) - with self.assertRaises(ValidationError): - acl_vm_interface.full_clean() - - def test_acl_vminterface_assignment_success(self): - """ - Test that ACLInterfaceAssignment passes validation if the ACL is assigned to the host and not already assigned to the vminterface and direction. - """ - vm_acl = AccessList( - name="STANDARD_ACL", - comments="STANDARD_ACL", - type="standard", - default_action="permit", - assigned_object_id=1, - assigned_object_type=ContentType.objects.get_for_model(VirtualMachine), - ) - vm_acl.save() - acl_vm_interface = ACLInterfaceAssignment( - access_list=vm_acl, - direction="ingress", - assigned_object_id=1, - assigned_object_type=ContentType.objects.get_for_model(VMInterface), - ) - acl_vm_interface.full_clean() - - def test_acl_interface_assignment_fail(self): - """ - Test that ACLInterfaceAssignment fails validation if the ACL is not assigned to the parent host. - """ - pass - # TODO: test_acl_interface_assignment_fail - VM & Device - - def test_duplicate_assignment_fail(self): - """ - Test that ACLInterfaceAssignment fails validation if the ACL already is assigned to the same interface and direction. - """ - pass - # TODO: test_duplicate_assignment_fail - VM & Device - - def test_acl_already_assinged_fail(self): - """ - Test that ACLInterfaceAssignment fails validation if the interface already has an ACL assigned in the same direction. - """ - pass - ## TODO: test_acl_already_assinged_fail - VM & Device - - # TODO: Test choices for ACLInterfaceAssignment Model - - -# TODO: Investigate a Base model for ACLStandardRule and ACLExtendedRule - - -class TestACLStandardRule(BaseTestCase): - """ - Test ACLStandardRule model. - """ - - # TODO: Develop tests for ACLStandardRule model - - -class ACLExtendedRule(BaseTestCase): - """ - Test ACLExtendedRule model. - """ - - # TODO: Develop tests for ACLExtendedRule model diff --git a/netbox_acls/tests/models/test_aclinterfaceassignments.py b/netbox_acls/tests/models/test_aclinterfaceassignments.py new file mode 100644 index 00000000..d1f192bb --- /dev/null +++ b/netbox_acls/tests/models/test_aclinterfaceassignments.py @@ -0,0 +1,129 @@ +from dcim.models import Device, Interface +from django.contrib.contenttypes.models import ContentType +from django.core.exceptions import ValidationError +from virtualization.models import VirtualMachine, VMInterface + +from netbox_acls.models import AccessList, ACLInterfaceAssignment + +from .base import BaseTestCase + + +class TestACLInterfaceAssignment(BaseTestCase): + """ + Test ACLInterfaceAssignment model. + """ + + @classmethod + def setUpTestData(cls): + """ + Extend BaseTestCase's setUpTestData() to create additional data for testing. + """ + super().setUpTestData() + device = Device.objects.first() + interfaces = Interface.objects.bulk_create( + ( + Interface(name="Interface 1", device=device, type="1000baset"), + Interface(name="Interface 2", device=device, type="1000baset"), + ) + ) + virtual_machine = VirtualMachine.objects.first() + vminterfaces = VMInterface.objects.bulk_create( + ( + VMInterface(name="Interface 1", virtual_machine=virtual_machine), + VMInterface(name="Interface 2", virtual_machine=virtual_machine), + ) + ) + # prefixes = Prefix.objects.bulk_create( + # ( + # Prefix(prefix=IPNetwork("10.0.0.0/24")), + # Prefix(prefix=IPNetwork("192.168.1.0/24")), + # ) + # ) + + def test_acl_interface_assignment_success(self): + """ + Test that ACLInterfaceAssignment passes validation if the ACL is assigned to the host + and not already assigned to the interface and direction. + """ + device_acl = AccessList( + name="STANDARD_ACL", + comments="STANDARD_ACL", + type="standard", + default_action="permit", + assigned_object=Device.objects.first(), + ) + device_acl.save() + acl_device_interface = ACLInterfaceAssignment( + access_list=device_acl, + direction="ingress", + assigned_object=Interface.objects.first(), + ) + acl_device_interface.full_clean() + + def test_aclinterface_assignment_fail(self): + """ + Test that ACLInterfaceAssignment passes validation if the ACL is assigned to the host + and not already assigned to the vminterface and direction. + """ + device_acl = AccessList( + name="STANDARD_ACL", + comments="STANDARD_ACL", + type="standard", + default_action="permit", + assigned_object=Device.objects.first(), + ) + device_acl.save() + acl_vm_interface = ACLInterfaceAssignment( + access_list=device_acl, + direction="ingress", + assigned_object=VMInterface.objects.first(), + ) + with self.assertRaises(ValidationError): + acl_vm_interface.full_clean() + + def test_acl_vminterface_assignment_success(self): + """ + Test that ACLInterfaceAssignment passes validation if the ACL is assigned to the host + and not already assigned to the vminterface and direction. + """ + vm_acl = AccessList( + name="STANDARD_ACL", + comments="STANDARD_ACL", + type="standard", + default_action="permit", + assigned_object_id=1, + assigned_object_type=ContentType.objects.get_for_model(VirtualMachine), + ) + vm_acl.save() + acl_vm_interface = ACLInterfaceAssignment( + access_list=vm_acl, + direction="ingress", + assigned_object_id=1, + assigned_object_type=ContentType.objects.get_for_model(VMInterface), + ) + acl_vm_interface.full_clean() + + def test_acl_interface_assignment_fail(self): + """ + Test that ACLInterfaceAssignment fails validation if the ACL is not assigned to the parent host. + """ + pass + # TODO: test_acl_interface_assignment_fail - VM & Device + + def test_duplicate_assignment_fail(self): + """ + Test that ACLInterfaceAssignment fails validation + if the ACL already is assigned to the same interface and direction. + """ + pass + # TODO: test_duplicate_assignment_fail - VM & Device + + def test_acl_already_assigned_fail(self): + """ + Test that ACLInterfaceAssignment fails validation + if the interface already has an ACL assigned in the same direction. + """ + pass + # TODO: test_acl_already_assigned_fail - VM & Device + + # TODO: Test choices for ACLInterfaceAssignment Model diff --git a/netbox_acls/tests/models/test_extendedrules.py b/netbox_acls/tests/models/test_extendedrules.py new file mode 100644 index 00000000..1efc248d --- /dev/null +++ b/netbox_acls/tests/models/test_extendedrules.py @@ -0,0 +1,9 @@ +from .base import BaseTestCase + + +class ACLExtendedRule(BaseTestCase): + """ + Test ACLExtendedRule model. + """ + + # TODO: Develop tests for ACLExtendedRule model diff --git a/netbox_acls/tests/models/test_standardrules.py b/netbox_acls/tests/models/test_standardrules.py new file mode 100644 index 00000000..ab68401b --- /dev/null +++ b/netbox_acls/tests/models/test_standardrules.py @@ -0,0 +1,9 @@ +from .base import BaseTestCase + + +class TestACLStandardRule(BaseTestCase): + """ + Test ACLStandardRule model. + """ + + # TODO: Develop tests for ACLStandardRule model From 51bd56a625f2417f4ae1439fe88bff4812e7dc4f Mon Sep 17 00:00:00 2001 From: Martin Hauser Date: Sun, 23 Mar 2025 16:33:22 +0100 Subject: [PATCH 10/16] chore(tests): Enhance test fixtures Refactor test setup to improve data creation and reusability. Add detailed fixtures for devices, virtual chassis, clusters, virtual machines, and prefixes. Update all assignments to reference the new fixtures for better consistency. --- netbox_acls/tests/models/base.py | 112 +++++++++++++----- netbox_acls/tests/models/test_accesslists.py | 49 +++----- .../models/test_aclinterfaceassignments.py | 69 +++++------ 3 files changed, 134 insertions(+), 96 deletions(-) diff --git a/netbox_acls/tests/models/base.py b/netbox_acls/tests/models/base.py index a9e5bdc3..55d5d0b5 100644 --- a/netbox_acls/tests/models/base.py +++ b/netbox_acls/tests/models/base.py @@ -4,10 +4,11 @@ DeviceType, Manufacturer, Site, + VirtualChassis, ) from django.test import TestCase from ipam.models import Prefix -from virtualization.models import VirtualMachine +from virtualization.models import Cluster, ClusterType, VirtualMachine class BaseTestCase(TestCase): @@ -20,50 +21,97 @@ def setUpTestData(cls): """ Create base data to test using including - 1 of each of the following: test site, manufacturer, device type - device role, cluster type, cluster, virtual_chassis, and + device role, cluster type, cluster, virtual chassis, and virtual machine - - 2 devices, prefixes, 2 interfaces, and 2 vminterfaces + - 2 of each Device, prefix """ - site = Site.objects.create(name="Site 1", slug="site-1") + # Sites + site = Site.objects.create( + name="Site 1", + slug="site-1", + ) + + # Device Types manufacturer = Manufacturer.objects.create( name="Manufacturer 1", slug="manufacturer-1", ) - devicetype = DeviceType.objects.create( + device_type = DeviceType.objects.create( manufacturer=manufacturer, model="Device Type 1", ) - devicerole = DeviceRole.objects.create( + + # Device Roles + device_role = DeviceRole.objects.create( name="Device Role 1", slug="device-role-1", ) - device = Device.objects.create( + + # Devices + cls.device1 = Device.objects.create( name="Device 1", site=site, - device_type=devicetype, - device_role=devicerole, + device_type=device_type, + role=device_role, + ) + cls.device2 = Device.objects.create( + name="Device 2", + site=site, + device_type=device_type, + role=device_role, + ) + + # Virtual Chassis + cls.virtual_chassis1 = VirtualChassis.objects.create( + name="Virtual Chassis 1", + ) + + # Virtual Chassis Members + cls.virtual_chassis_member1 = Device.objects.create( + name="VC Device", + site=site, + device_type=device_type, + role=device_role, + virtual_chassis=cls.virtual_chassis1, + vc_position=1, + ) + + # Virtualization Cluster Type + cluster_type = ClusterType.objects.create( + name="Cluster Type 1", + ) + + # Virtualization Cluster + cluster = Cluster.objects.create( + name="Cluster 1", + type=cluster_type, + ) + + # Virtualization Cluster Member + cls.cluster_member1 = Device.objects.create( + name="Cluster Device", + site=site, + device_type=device_type, + role=device_role, + ) + + # Virtual Machine + cls.virtual_machine1 = VirtualMachine.objects.create( + name="VirtualMachine 1", + status="active", + cluster=cluster, + ) + cls.virtual_machine2 = VirtualMachine.objects.create( + name="VirtualMachine 2", + status="active", + cluster=cluster, + ) + + # Prefix + cls.prefix1 = Prefix.objects.create( + prefix="10.1.0.0/16", + ) + cls.prefix2 = Prefix.objects.create( + prefix="10.2.0.0/16", ) - # virtual_chassis = VirtualChassis.objects.create(name="Virtual Chassis 1") - # virtual_chassis_member = Device.objects.create( - # name="VC Device", - # site=site, - # device_type=devicetype, - # device_role=devicerole, - # virtual_chassis=virtual_chassis, - # vc_position=1, - # ) - # cluster_member = Device.objects.create( - # name="Cluster Device", - # site=site, - # device_type=devicetype, - # device_role=devicerole, - # ) - # clustertype = ClusterType.objects.create(name="Cluster Type 1") - # cluster = Cluster.objects.create( - # name="Cluster 1", - # type=clustertype, - # ) - virtual_machine = VirtualMachine.objects.create(name="VirtualMachine 1") - virtual_machine.save() - prefix = Prefix.objects.create(prefix="10.0.0.0/8") diff --git a/netbox_acls/tests/models/test_accesslists.py b/netbox_acls/tests/models/test_accesslists.py index 9f913408..aac6bdd3 100644 --- a/netbox_acls/tests/models/test_accesslists.py +++ b/netbox_acls/tests/models/test_accesslists.py @@ -4,7 +4,6 @@ from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ValidationError from ipam.models import Prefix -from virtualization.models import VirtualMachine from netbox_acls.models import AccessList @@ -20,16 +19,16 @@ class TestAccessList(BaseTestCase): "type": "extended", "default_action": "permit", } - # device = Device.objects.first() def test_wrong_assigned_object_type_fail(self): """ - Test that AccessList cannot be assigned to an object type other than Device, VirtualChassis, VirtualMachine, or Cluster. + Test that AccessList cannot be assigned to an object type other than Device, VirtualChassis, VirtualMachine, + or Cluster. """ acl_bad_gfk = AccessList( name="TestACL_Wrong_GFK", assigned_object_type=ContentType.objects.get_for_model(Prefix), - assigned_object_id=Prefix.objects.first(), + assigned_object_id=self.prefix1.id, **self.common_acl_params, ) with self.assertRaises(ValidationError): @@ -40,31 +39,24 @@ def test_alphanumeric_plus_success(self): Test that AccessList names with alphanumeric characters, '_', or '-' pass validation. """ acl_good_name = AccessList( - name="Testacl-Good_Name-1", + name="Test-ACL-Good_Name-1", assigned_object_type=ContentType.objects.get_for_model(Device), - assigned_object_id=1, # TODO - replace with Device.objects.first() + assigned_object_id=self.device1.id, **self.common_acl_params, ) acl_good_name.full_clean() - # TODO: test_alphanumeric_plus_success - VirtualChassis, VirtualMachine & Cluster def test_duplicate_name_success(self): """ Test that AccessList names can be non-unique if associated with different devices. """ - AccessList.objects.create( + # Device + device_acl = AccessList( name="GOOD-DUPLICATE-ACL", - assigned_object_type=ContentType.objects.get_for_model(Device), - assigned_object_id=1, # TODO - replace with Device.objects.first() - **self.common_acl_params, - ) - vm_acl = AccessList( - name="GOOD-DUPLICATE-ACL", - assigned_object_type=ContentType.objects.get_for_model(VirtualMachine), - assigned_object_id=1, # TODO - replace with VirtualMachine.objects.first().id, + assigned_object=self.device1, **self.common_acl_params, ) - vm_acl.full_clean() + device_acl.full_clean() # TODO: test_duplicate_name_success - VirtualChassis, VirtualMachine & Cluster # vc_acl = AccessList( # "name": "GOOD-DUPLICATE-ACL", @@ -81,23 +73,23 @@ def test_alphanumeric_plus_fail(self): for i, char in enumerate(non_alphanumeric_plus_chars, start=1): bad_acl_name = AccessList( - name=f"Testacl-bad_name_{i}_{char}", - assigned_object_type=ContentType.objects.get_for_model(Device), + name=f"Test-ACL-bad_name_{i}_{char}", + assigned_object=self.device1, comments=f'ACL with "{char}" in name', **self.common_acl_params, ) with self.assertRaises(ValidationError): bad_acl_name.full_clean() - def test_duplicate_name_fail(self): + def test_duplicate_name_per_device_fail(self): """ Test that AccessList names must be unique per device. """ params = { "name": "FAIL-DUPLICATE-ACL", "assigned_object_type": ContentType.objects.get_for_model(Device), + "assigned_object_id": self.device1.id, **self.common_acl_params, - "assigned_object_id": 1, # TODO - replace with Device.objects.first() } acl_1 = AccessList.objects.create(**params) acl_1.save() @@ -122,11 +114,10 @@ def test_valid_acl_choices(self): for default_action, acl_type in valid_acl_choices: valid_acl_choice = AccessList( name=f"TestACL_Valid_Choice_{default_action}_{acl_type}", - comments=f"VALID ACL CHOICES USED: {default_action=} {acl_type=}", + assigned_object=self.device1, type=acl_type, default_action=default_action, - assigned_object_type=ContentType.objects.get_for_model(Device), - assigned_object_id=1, # TODO - replace with Device.objects.first() + comments=f"VALID ACL CHOICES USED: {default_action=} {acl_type=}", ) valid_acl_choice.full_clean() @@ -138,11 +129,10 @@ def test_invalid_acl_choices(self): invalid_acl_default_action_choice = "log" invalid_acl_default_action = AccessList( name=f"TestACL_Valid_Choice_{invalid_acl_default_action_choice}_{valid_acl_types[0]}", - comments=f"INVALID ACL DEFAULT CHOICE USED: default_action='{invalid_acl_default_action_choice}'", + assigned_object=self.device1, type=valid_acl_types[0], default_action=invalid_acl_default_action_choice, - assigned_object_type=ContentType.objects.get_for_model(Device), - assigned_object_id=1, # TODO - replace with Device.objects.first() + comments=f"INVALID ACL DEFAULT CHOICE USED: default_action='{invalid_acl_default_action_choice}'", ) with self.assertRaises(ValidationError): invalid_acl_default_action.full_clean() @@ -151,11 +141,10 @@ def test_invalid_acl_choices(self): invalid_acl_type = "super-dupper-extended" invalid_acl_type = AccessList( name=f"TestACL_Valid_Choice_{valid_acl_default_action_choices[0]}_{invalid_acl_type}", - comments=f"INVALID ACL DEFAULT CHOICE USED: type='{invalid_acl_type}'", + assigned_object=self.device1, type=invalid_acl_type, default_action=valid_acl_default_action_choices[0], - assigned_object_type=ContentType.objects.get_for_model(Device), - assigned_object_id=1, # TODO - replace with Device.objects.first() + comments=f"INVALID ACL DEFAULT CHOICE USED: type='{invalid_acl_type}'", ) with self.assertRaises(ValidationError): invalid_acl_type.full_clean() diff --git a/netbox_acls/tests/models/test_aclinterfaceassignments.py b/netbox_acls/tests/models/test_aclinterfaceassignments.py index d1f192bb..f8b3774b 100644 --- a/netbox_acls/tests/models/test_aclinterfaceassignments.py +++ b/netbox_acls/tests/models/test_aclinterfaceassignments.py @@ -1,7 +1,6 @@ -from dcim.models import Device, Interface -from django.contrib.contenttypes.models import ContentType +from dcim.models import Interface from django.core.exceptions import ValidationError -from virtualization.models import VirtualMachine, VMInterface +from virtualization.models import VMInterface from netbox_acls.models import AccessList, ACLInterfaceAssignment @@ -19,26 +18,30 @@ def setUpTestData(cls): Extend BaseTestCase's setUpTestData() to create additional data for testing. """ super().setUpTestData() - device = Device.objects.first() - interfaces = Interface.objects.bulk_create( - ( - Interface(name="Interface 1", device=device, type="1000baset"), - Interface(name="Interface 2", device=device, type="1000baset"), - ) + + interface_type = "1000baset" + + # Device Interfaces + cls.device_interface1 = Interface.objects.create( + name="Interface 1", + device=cls.device1, + type=interface_type, ) - virtual_machine = VirtualMachine.objects.first() - vminterfaces = VMInterface.objects.bulk_create( - ( - VMInterface(name="Interface 1", virtual_machine=virtual_machine), - VMInterface(name="Interface 2", virtual_machine=virtual_machine), - ) + cls.device_interface2 = Interface.objects.create( + name="Interface 2", + device=cls.device1, + type=interface_type, + ) + + # Virtual Machine Interfaces + cls.vm_interface1 = VMInterface.objects.create( + name="Interface 1", + virtual_machine=cls.virtual_machine1, + ) + cls.vm_interface2 = VMInterface.objects.create( + name="Interface 2", + virtual_machine=cls.virtual_machine1, ) - # prefixes = Prefix.objects.bulk_create( - # ( - # Prefix(prefix=IPNetwork("10.0.0.0/24")), - # Prefix(prefix=IPNetwork("192.168.1.0/24")), - # ) - # ) def test_acl_interface_assignment_success(self): """ @@ -47,36 +50,36 @@ def test_acl_interface_assignment_success(self): """ device_acl = AccessList( name="STANDARD_ACL", - comments="STANDARD_ACL", + assigned_object=self.device1, type="standard", default_action="permit", - assigned_object=Device.objects.first(), + comments="STANDARD_ACL", ) device_acl.save() acl_device_interface = ACLInterfaceAssignment( access_list=device_acl, direction="ingress", - assigned_object=Interface.objects.first(), + assigned_object=self.device_interface1, ) acl_device_interface.full_clean() - def test_aclinterface_assignment_fail(self): + def test_acl_interface_assignment_fail(self): """ Test that ACLInterfaceAssignment passes validation if the ACL is assigned to the host and not already assigned to the vminterface and direction. """ device_acl = AccessList( name="STANDARD_ACL", - comments="STANDARD_ACL", + assigned_object=self.device1, type="standard", default_action="permit", - assigned_object=Device.objects.first(), + comments="STANDARD_ACL", ) device_acl.save() acl_vm_interface = ACLInterfaceAssignment( access_list=device_acl, direction="ingress", - assigned_object=VMInterface.objects.first(), + assigned_object=self.vm_interface1, ) with self.assertRaises(ValidationError): acl_vm_interface.full_clean() @@ -88,22 +91,20 @@ def test_acl_vminterface_assignment_success(self): """ vm_acl = AccessList( name="STANDARD_ACL", - comments="STANDARD_ACL", + assigned_object=self.virtual_machine1, type="standard", default_action="permit", - assigned_object_id=1, - assigned_object_type=ContentType.objects.get_for_model(VirtualMachine), + comments="STANDARD_ACL", ) vm_acl.save() acl_vm_interface = ACLInterfaceAssignment( access_list=vm_acl, direction="ingress", - assigned_object_id=1, - assigned_object_type=ContentType.objects.get_for_model(VMInterface), + assigned_object=self.vm_interface1, ) acl_vm_interface.full_clean() - def test_acl_interface_assignment_fail(self): + def test_acl_host_interface_assignment_fail(self): """ Test that ACLInterfaceAssignment fails validation if the ACL is not assigned to the parent host. """ From 624f796cc6bed17bfd6a4803b57f08ed5d375fc5 Mon Sep 17 00:00:00 2001 From: Martin Hauser Date: Sun, 23 Mar 2025 16:35:42 +0100 Subject: [PATCH 11/16] chore(tests): Add tests for ACL creation Add test cases to validate AccessList creation and uniqueness for VirtualChassis and VirtualMachine objects. Extend coverage by ensuring proper handling of duplicate names and enforcing unique constraints. --- netbox_acls/tests/models/test_accesslists.py | 137 +++++++++++++++++-- 1 file changed, 128 insertions(+), 9 deletions(-) diff --git a/netbox_acls/tests/models/test_accesslists.py b/netbox_acls/tests/models/test_accesslists.py index aac6bdd3..579bd719 100644 --- a/netbox_acls/tests/models/test_accesslists.py +++ b/netbox_acls/tests/models/test_accesslists.py @@ -1,9 +1,10 @@ from itertools import cycle -from dcim.models import Device +from dcim.models import Device, VirtualChassis from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ValidationError from ipam.models import Prefix +from virtualization.models import VirtualMachine from netbox_acls.models import AccessList @@ -20,6 +21,84 @@ class TestAccessList(BaseTestCase): "default_action": "permit", } + def test_accesslist_standard_creation(self): + """ + Test that AccessList Standard creation passes validation. + """ + acl_name = "Test-ACL-Standard-Type" + + created_acl = AccessList( + name=acl_name, + assigned_object=self.device1, + type="standard", + default_action="deny", + ) + + self.assertTrue(isinstance(created_acl, AccessList), True) + self.assertEqual(created_acl.name, acl_name) + self.assertEqual(created_acl.type, "standard") + self.assertEqual(created_acl.default_action, "deny") + self.assertEqual(isinstance(created_acl.assigned_object, Device), True) + self.assertEqual(created_acl.assigned_object, self.device1) + + def test_accesslist_extended_creation(self): + """ + Test that AccessList Extended creation passes validation. + """ + acl_name = "Test-ACL-Extended-Type" + + created_acl = AccessList( + name=acl_name, + assigned_object=self.device2, + type="extended", + default_action="permit", + ) + + self.assertTrue(isinstance(created_acl, AccessList)) + self.assertEqual(created_acl.name, acl_name) + self.assertEqual(created_acl.type, "extended") + self.assertEqual(created_acl.default_action, "permit") + self.assertEqual(isinstance(created_acl.assigned_object, Device), True) + self.assertEqual(created_acl.assigned_object, self.device2) + + def test_accesslist_creation_with_virtual_chassis(self): + """ + Test that AccessList creation with an assigned virtual chassis passes validation. + """ + acl_name = "Test-ACL-with-Virtual-Machine" + + created_acl = AccessList( + name=acl_name, + assigned_object=self.virtual_chassis1, + **self.common_acl_params, + ) + + self.assertTrue(isinstance(created_acl, AccessList)) + self.assertEqual(created_acl.name, acl_name) + self.assertEqual(created_acl.type, "extended") + self.assertEqual(created_acl.default_action, "permit") + self.assertEqual(isinstance(created_acl.assigned_object, VirtualChassis), True) + self.assertEqual(created_acl.assigned_object, self.virtual_chassis1) + + def test_accesslist_creation_with_virtual_machine(self): + """ + Test that AccessList creation with an assigned virtual machine passes validation. + """ + acl_name = "Test-ACL-with-Virtual-Machine" + + created_acl = AccessList( + name=acl_name, + assigned_object=self.virtual_machine1, + **self.common_acl_params, + ) + + self.assertTrue(isinstance(created_acl, AccessList)) + self.assertEqual(created_acl.name, acl_name) + self.assertEqual(created_acl.type, "extended") + self.assertEqual(created_acl.default_action, "permit") + self.assertEqual(isinstance(created_acl.assigned_object, VirtualMachine), True) + self.assertEqual(created_acl.assigned_object, self.virtual_machine1) + def test_wrong_assigned_object_type_fail(self): """ Test that AccessList cannot be assigned to an object type other than Device, VirtualChassis, VirtualMachine, @@ -57,13 +136,22 @@ def test_duplicate_name_success(self): **self.common_acl_params, ) device_acl.full_clean() - # TODO: test_duplicate_name_success - VirtualChassis, VirtualMachine & Cluster - # vc_acl = AccessList( - # "name": "GOOD-DUPLICATE-ACL", - # assigned_object_type=ContentType.objects.get_for_model(VirtualChassis), - # **self.common_acl_params, - # ) - # vc_acl.full_clean() + + # Virtual Chassis + vc_acl = AccessList( + name="GOOD-DUPLICATE-ACL", + assigned_object=self.virtual_chassis1, + **self.common_acl_params, + ) + vc_acl.full_clean() + + # Virtual Machine + vm_acl = AccessList( + name="GOOD-DUPLICATE-ACL", + assigned_object=self.virtual_machine1, + **self.common_acl_params, + ) + vm_acl.full_clean() def test_alphanumeric_plus_fail(self): """ @@ -96,7 +184,38 @@ def test_duplicate_name_per_device_fail(self): acl_2 = AccessList(**params) with self.assertRaises(ValidationError): acl_2.full_clean() - # TODO: test_duplicate_name_fail - VirtualChassis & Cluster + + def test_duplicate_name_per_virtual_chassis_fail(self): + """ + Test that AccessList names must be unique per virtual chassis. + """ + params = { + "name": "FAIL-DUPLICATE-ACL", + "assigned_object_type": ContentType.objects.get_for_model(VirtualChassis), + "assigned_object_id": self.virtual_chassis1.id, + **self.common_acl_params, + } + acl_1 = AccessList.objects.create(**params) + acl_1.save() + acl_2 = AccessList(**params) + with self.assertRaises(ValidationError): + acl_2.full_clean() + + def test_duplicate_name_per_virtual_machine_fail(self): + """ + Test that AccessList names must be unique per virtual machine. + """ + params = { + "name": "FAIL-DUPLICATE-ACL", + "assigned_object_type": ContentType.objects.get_for_model(VirtualMachine), + "assigned_object_id": self.virtual_machine1.id, + **self.common_acl_params, + } + acl_1 = AccessList.objects.create(**params) + acl_1.save() + acl_2 = AccessList(**params) + with self.assertRaises(ValidationError): + acl_2.full_clean() def test_valid_acl_choices(self): """ From 560d93739bbfb29d444a8a884b25eb6badf4cca7 Mon Sep 17 00:00:00 2001 From: Martin Hauser Date: Tue, 25 Mar 2025 19:22:48 +0100 Subject: [PATCH 12/16] fix(models): Ensure interface host matches access list host Move validation to the `save` method to enforce that the assigned interface's host matches the access list's host before saving. This replaces validation in the `clean` method for better data integrity enforcement. --- netbox_acls/models/access_lists.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/netbox_acls/models/access_lists.py b/netbox_acls/models/access_lists.py index 76031fbb..846333d4 100644 --- a/netbox_acls/models/access_lists.py +++ b/netbox_acls/models/access_lists.py @@ -150,22 +150,18 @@ def get_absolute_url(self): args=[self.pk], ) - def clean(self): - super().clean() - - # Get the model type of the assigned interface. - if self.assigned_object_type.model_class() == VMInterface: - interface_host = self.assigned_object.virtual_machine - elif self.assigned_object_type.model_class() == Interface: - interface_host = self.assigned_object.device - # Check if the assigned interface's host is the same as the host assigned to the access list. - if interface_host != self.access_list.assigned_object: + def save(self, *args, **kwargs): + """Saves the current instance to the database.""" + # Ensure the assigned interface's host matches the host assigned to the access list. + if self.assigned_object.parent_object != self.access_list.assigned_object: raise ValidationError( { "assigned_object": "The assigned object must be the same as the device assigned to it." } ) + super().save(*args, **kwargs) + def get_direction_color(self): return ACLAssignmentDirectionChoices.colors.get(self.direction) From 6a2ee86ee010f1295e61dbf78a66fa6cd6704629 Mon Sep 17 00:00:00 2001 From: Martin Hauser Date: Tue, 25 Mar 2025 23:04:58 +0100 Subject: [PATCH 13/16] chore(tests): Enhance ACLInterfaceAssignment tests Improve test coverage for ACLInterfaceAssignment validation. Add tests for valid and invalid direction choices, duplicate assignments, and missing host assignments to ensure robust validation logic. --- .../models/test_aclinterfaceassignments.py | 84 ++++++++++++++++--- 1 file changed, 72 insertions(+), 12 deletions(-) diff --git a/netbox_acls/tests/models/test_aclinterfaceassignments.py b/netbox_acls/tests/models/test_aclinterfaceassignments.py index f8b3774b..bd2cb0f9 100644 --- a/netbox_acls/tests/models/test_aclinterfaceassignments.py +++ b/netbox_acls/tests/models/test_aclinterfaceassignments.py @@ -65,8 +65,8 @@ def test_acl_interface_assignment_success(self): def test_acl_interface_assignment_fail(self): """ - Test that ACLInterfaceAssignment passes validation if the ACL is assigned to the host - and not already assigned to the vminterface and direction. + Test that ACLInterfaceAssignment fails validation if the ACL is not + assigned to the parent host. """ device_acl = AccessList( name="STANDARD_ACL", @@ -83,6 +83,7 @@ def test_acl_interface_assignment_fail(self): ) with self.assertRaises(ValidationError): acl_vm_interface.full_clean() + acl_vm_interface.save() def test_acl_vminterface_assignment_success(self): """ @@ -104,20 +105,33 @@ def test_acl_vminterface_assignment_success(self): ) acl_vm_interface.full_clean() - def test_acl_host_interface_assignment_fail(self): - """ - Test that ACLInterfaceAssignment fails validation if the ACL is not assigned to the parent host. - """ - pass - # TODO: test_acl_interface_assignment_fail - VM & Device - def test_duplicate_assignment_fail(self): """ Test that ACLInterfaceAssignment fails validation if the ACL already is assigned to the same interface and direction. """ - pass - # TODO: test_duplicate_assignment_fail - VM & Device + device_acl = AccessList( + name="STANDARD_ACL", + assigned_object=self.device1, + type="standard", + default_action="permit", + comments="STANDARD_ACL", + ) + device_acl.save() + acl_device_interface1 = ACLInterfaceAssignment( + access_list=device_acl, + direction="ingress", + assigned_object=self.device_interface1, + ) + acl_device_interface1.full_clean() + acl_device_interface1.save() + acl_device_interface2 = ACLInterfaceAssignment( + access_list=device_acl, + direction="ingress", + assigned_object=self.device_interface1, + ) + with self.assertRaises(ValidationError): + acl_device_interface2.full_clean() def test_acl_already_assigned_fail(self): """ @@ -127,4 +141,50 @@ def test_acl_already_assigned_fail(self): pass # TODO: test_acl_already_assigned_fail - VM & Device - # TODO: Test choices for ACLInterfaceAssignment Model + def test_valid_acl_interface_assignment_choices(self): + """ + Test that ACLInterfaceAssignment action choices using VALID choices. + """ + valid_acl_assignment_direction_choices = ["ingress", "egress"] + + test_acl = AccessList( + name="STANDARD_ACL", + assigned_object=self.device1, + type="standard", + default_action="permit", + comments="STANDARD_ACL", + ) + test_acl.save() + + for direction_choice in valid_acl_assignment_direction_choices: + valid_acl_assignment = ACLInterfaceAssignment( + access_list=test_acl, + direction=direction_choice, + assigned_object=self.device_interface1, + comments=f"VALID ACL ASSIGNMENT CHOICES USED: direction={direction_choice}", + ) + valid_acl_assignment.full_clean() + + def test_invalid_acl_choices(self): + """ + Test that ACLInterfaceAssignment action choices using INVALID choices. + """ + invalid_acl_assignment_direction_choice = "both" + + test_acl = AccessList( + name="STANDARD_ACL", + assigned_object=self.device1, + type="standard", + default_action="permit", + comments="STANDARD_ACL", + ) + test_acl.save() + + invalid_acl_assignment_direction = ACLInterfaceAssignment( + access_list=test_acl, + direction=invalid_acl_assignment_direction_choice, + assigned_object=self.device_interface1, + comments=f"INVALID ACL DEFAULT CHOICE USED: default_action='{invalid_acl_assignment_direction_choice}'", + ) + with self.assertRaises(ValidationError): + invalid_acl_assignment_direction.full_clean() From 871b0b7406f5f0fe8714f89fd665ef0abbcdd0a4 Mon Sep 17 00:00:00 2001 From: Martin Hauser Date: Thu, 27 Mar 2025 21:48:17 +0100 Subject: [PATCH 14/16] refactor(models): Centralize ACL rule validation Moves ACL rule validation logic from forms to model methods to ensure consistent enforcement across all usages. This improves maintainability and eliminates redundant validation code. --- netbox_acls/forms/models.py | 78 ---------------------- netbox_acls/models/access_list_rules.py | 88 +++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 78 deletions(-) diff --git a/netbox_acls/forms/models.py b/netbox_acls/forms/models.py index 454c8782..42450a70 100644 --- a/netbox_acls/forms/models.py +++ b/netbox_acls/forms/models.py @@ -43,13 +43,6 @@ # Sets a standard help_text value to be used by the various classes for acl index help_text_acl_rule_index = "Determines the order of the rule in the ACL processing. AKA Sequence Number." -# Sets a standard error message for ACL rules with an action of remark, but no remark set. -error_message_no_remark = "Action is set to remark, you MUST add a remark." -# Sets a standard error message for ACL rules with an action of remark, but no source_prefix is set. -error_message_action_remark_source_prefix_set = "Action is set to remark, Source Prefix CANNOT be set." -# Sets a standard error message for ACL rules with an action not set to remark, but no remark is set. -error_message_remark_without_action_remark = "CANNOT set remark unless action is set to remark." - class AccessListForm(NetBoxModelForm): """ @@ -545,35 +538,6 @@ class Meta: ), } - def clean(self): - """ - Validates form inputs before submitting: - - Check if action set to remark, but no remark set. - - Check if action set to remark, but source_prefix set. - - Check remark set, but action not set to remark. - """ - super().clean() - cleaned_data = self.cleaned_data - error_message = {} - - action = cleaned_data.get("action") - remark = cleaned_data.get("remark") - source_prefix = cleaned_data.get("source_prefix") - - if action == "remark": - # Check if action set to remark, but no remark set. - if not remark: - error_message["remark"] = [error_message_no_remark] - # Check if action set to remark, but source_prefix set. - if source_prefix: - error_message["source_prefix"] = [error_message_action_remark_source_prefix_set] - # Check remark set, but action not set to remark. - elif remark: - error_message["remark"] = [error_message_remark_without_action_remark] - - if error_message: - raise ValidationError(error_message) - class ACLExtendedRuleForm(NetBoxModelForm): """ @@ -651,45 +615,3 @@ class Meta: ), "source_ports": help_text_acl_rule_logic, } - - def clean(self): - """ - Validates form inputs before submitting: - - Check if action set to remark, but no remark set. - - Check if action set to remark, but source_prefix set. - - Check if action set to remark, but source_ports set. - - Check if action set to remark, but destination_prefix set. - - Check if action set to remark, but destination_ports set. - - Check if action set to remark, but protocol set. - - Check remark set, but action not set to remark. - """ - super().clean() - cleaned_data = self.cleaned_data - error_message = {} - - action = cleaned_data.get("action") - remark = cleaned_data.get("remark") - source_prefix = cleaned_data.get("source_prefix") - source_ports = cleaned_data.get("source_ports") - destination_prefix = cleaned_data.get("destination_prefix") - destination_ports = cleaned_data.get("destination_ports") - protocol = cleaned_data.get("protocol") - - if action == "remark": - if not remark: - error_message["remark"] = [error_message_no_remark] - if source_prefix: - error_message["source_prefix"] = [error_message_action_remark_source_prefix_set] - if source_ports: - error_message["source_ports"] = ["Action is set to remark, Source Ports CANNOT be set."] - if destination_prefix: - error_message["destination_prefix"] = ["Action is set to remark, Destination Prefix CANNOT be set."] - if destination_ports: - error_message["destination_ports"] = ["Action is set to remark, Destination Ports CANNOT be set."] - if protocol: - error_message["protocol"] = ["Action is set to remark, Protocol CANNOT be set."] - elif remark: - error_message["remark"] = [error_message_remark_without_action_remark] - - if error_message: - raise ValidationError(error_message) diff --git a/netbox_acls/models/access_list_rules.py b/netbox_acls/models/access_list_rules.py index 56567204..442e586c 100644 --- a/netbox_acls/models/access_list_rules.py +++ b/netbox_acls/models/access_list_rules.py @@ -4,8 +4,10 @@ from django.apps import apps from django.contrib.postgres.fields import ArrayField +from django.core.exceptions import ValidationError from django.db import models from django.urls import reverse +from django.utils.translation import gettext_lazy as _ from netbox.models import NetBoxModel from ..choices import ACLProtocolChoices, ACLRuleActionChoices, ACLTypeChoices @@ -17,6 +19,29 @@ "ACLExtendedRule", ) +# Error message when the action is 'remark', but no remark is provided. +ERROR_MESSAGE_NO_REMARK = _("When the action is 'remark', a remark is required.") + +# Error message when the action is 'remark', but the source_prefix is set. +ERROR_MESSAGE_ACTION_REMARK_SOURCE_PREFIX_SET = _("When the action is 'remark', the Source Prefix must not be set.") + +# Error message when the action is 'remark', but the source_ports are set. +ERROR_MESSAGE_ACTION_REMARK_SOURCE_PORTS_SET = _("When the action is 'remark', Source Ports must not be set.") + +# Error message when the action is 'remark', but the destination_prefix is set. +ERROR_MESSAGE_ACTION_REMARK_DESTINATION_PREFIX_SET = _( + "When the action is 'remark', the Destination Prefix must not be set." +) + +# Error message when the action is 'remark', but the destination_ports are set. +ERROR_MESSAGE_ACTION_REMARK_DESTINATION_PORTS_SET = _("When the action is 'remark', Destination Ports must not be set.") + +# Error message when the action is 'remark', but the protocol is set. +ERROR_MESSAGE_ACTION_REMARK_PROTOCOL_SET = _("When the action is 'remark', Protocol must not be set.") + +# Error message when a remark is provided, but the action is not set to 'remark'. +ERROR_MESSAGE_REMARK_WITHOUT_ACTION_REMARK = _("A remark cannot be set unless the action is 'remark'.") + class ACLRule(NetBoxModel): """ @@ -112,6 +137,31 @@ class Meta(ACLRule.Meta): verbose_name = "ACL Standard Rule" verbose_name_plural = "ACL Standard Rules" + def clean(self): + """ + Validate the ACL Standard Rule inputs. + + If the action is 'remark', then the remark field must be provided (non-empty), + and the source_prefix field must be empty. + Conversely, if the remark field is provided, the action must be set to 'remark'. + """ + + super().clean() + errors = {} + + # Validate that only the remark field is filled + if self.action == ACLRuleActionChoices.ACTION_REMARK: + if not self.remark: + errors["remark"] = ERROR_MESSAGE_NO_REMARK + if self.source_prefix: + errors["source_prefix"] = ERROR_MESSAGE_ACTION_REMARK_SOURCE_PREFIX_SET + # Validate that the action is "remark", when the remark field is provided + elif self.remark: + errors["remark"] = ERROR_MESSAGE_REMARK_WITHOUT_ACTION_REMARK + + if errors: + raise ValidationError(errors) + class ACLExtendedRule(ACLRule): """ @@ -176,3 +226,41 @@ class Meta(ACLRule.Meta): verbose_name = "ACL Extended Rule" verbose_name_plural = "ACL Extended Rules" + + def clean(self): + """ + Validate the ACL Extended Rule inputs. + + When the action is 'remark', the remark field must be provided (non-empty), + and the following fields must be empty: + - source_prefix + - source_ports + - destination_prefix + - destination_ports + - protocol + + Conversely, if a remark is provided, the action must be set to 'remark'. + """ + super().clean() + errors = {} + + # Validate that only the remark field is filled + if self.action == ACLRuleActionChoices.ACTION_REMARK: + if not self.remark: + errors["remark"] = ERROR_MESSAGE_NO_REMARK + if self.source_prefix: + errors["source_prefix"] = ERROR_MESSAGE_ACTION_REMARK_SOURCE_PREFIX_SET + if self.source_ports: + errors["source_ports"] = ERROR_MESSAGE_ACTION_REMARK_SOURCE_PORTS_SET + if self.destination_prefix: + errors["destination_prefix"] = ERROR_MESSAGE_ACTION_REMARK_DESTINATION_PREFIX_SET + if self.destination_ports: + errors["destination_ports"] = ERROR_MESSAGE_ACTION_REMARK_DESTINATION_PORTS_SET + if self.protocol: + errors["protocol"] = ERROR_MESSAGE_ACTION_REMARK_PROTOCOL_SET + # Validate that the action is "remark", when the remark field is provided + elif self.remark: + errors["remark"] = ERROR_MESSAGE_REMARK_WITHOUT_ACTION_REMARK + + if errors: + raise ValidationError(errors) From df0491ebad3fafe78d21cfc586ab160d1fd10d74 Mon Sep 17 00:00:00 2001 From: Martin Hauser Date: Thu, 27 Mar 2025 23:23:42 +0100 Subject: [PATCH 15/16] chore(tests): Add ACLStandardRule test coverage Adds comprehensive tests for the ACLStandardRule model. Tests cover rule creation scenarios, validation constraints, and edge cases, ensuring thorough model behavior validation. Fixes #47 --- .../tests/models/test_standardrules.py | 215 +++++++++++++++++- 1 file changed, 214 insertions(+), 1 deletion(-) diff --git a/netbox_acls/tests/models/test_standardrules.py b/netbox_acls/tests/models/test_standardrules.py index ab68401b..dcad4968 100644 --- a/netbox_acls/tests/models/test_standardrules.py +++ b/netbox_acls/tests/models/test_standardrules.py @@ -1,3 +1,8 @@ +from django.core.exceptions import ValidationError + +from netbox_acls.choices import ACLTypeChoices +from netbox_acls.models import AccessList, ACLStandardRule + from .base import BaseTestCase @@ -6,4 +11,212 @@ class TestACLStandardRule(BaseTestCase): Test ACLStandardRule model. """ - # TODO: Develop tests for ACLStandardRule model + @classmethod + def setUpTestData(cls): + """ + Extend BaseTestCase's setUpTestData() to create additional data for testing. + """ + super().setUpTestData() + + cls.acl_type = ACLTypeChoices.TYPE_STANDARD + cls.default_action = "deny" + + # AccessLists + cls.standard_acl1 = AccessList.objects.create( + name="STANDARD_ACL", + assigned_object=cls.device1, + type=cls.acl_type, + default_action=cls.default_action, + comments="STANDARD_ACL", + ) + cls.standard_acl2 = AccessList.objects.create( + name="STANDARD_ACL", + assigned_object=cls.virtual_machine1, + type=cls.acl_type, + default_action=cls.default_action, + comments="STANDARD_ACL", + ) + + def test_acl_standard_rule_creation_success(self): + """ + Test that ACLStandardRule creation passes validation. + """ + created_rule = ACLStandardRule( + access_list=self.standard_acl1, + index=10, + action="permit", + remark="", + source_prefix=None, + description="Created rule with any source prefix", + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLStandardRule), True) + self.assertEqual(created_rule.index, 10) + self.assertEqual(created_rule.action, "permit") + self.assertEqual(created_rule.remark, "") + self.assertEqual(created_rule.source_prefix, None) + self.assertEqual(created_rule.description, "Created rule with any source prefix") + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_acl_standard_rule_source_prefix_creation_success(self): + """ + Test that ACLStandardRule with source prefix creation passes validation. + """ + created_rule = ACLStandardRule( + access_list=self.standard_acl1, + index=20, + action="permit", + remark="", + source_prefix=self.prefix1, + description="Created rule with source prefix", + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLStandardRule), True) + self.assertEqual(created_rule.index, 20) + self.assertEqual(created_rule.action, "permit") + self.assertEqual(created_rule.remark, "") + self.assertEqual(created_rule.source_prefix, self.prefix1) + self.assertEqual(created_rule.description, "Created rule with source prefix") + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_acl_standard_rule_remark_creation_success(self): + """ + Test that ACLStandardRule with remark creation passes validation. + """ + created_rule = ACLStandardRule( + access_list=self.standard_acl1, + index=30, + action="remark", + remark="Test remark", + source_prefix=None, + description="Created rule with remark", + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLStandardRule), True) + self.assertEqual(created_rule.index, 30) + self.assertEqual(created_rule.action, "remark") + self.assertEqual(created_rule.remark, "Test remark") + self.assertEqual(created_rule.source_prefix, None) + self.assertEqual(created_rule.description, "Created rule with remark") + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_access_list_extended_to_acl_standard_rule_assignment_fail(self): + """ + Test that Extended Access List cannot be assigned to ACLStandardRule. + """ + extended_acl1 = AccessList.objects.create( + name="EXTENDED_ACL", + assigned_object=self.device1, + type=ACLTypeChoices.TYPE_EXTENDED, + default_action=self.default_action, + comments="EXTENDED_ACL", + ) + standard_rule = ACLStandardRule( + access_list=extended_acl1, + index=30, + action="remark", + remark="Test remark", + source_prefix=None, + description="Created rule with remark", + ) + with self.assertRaises(ValidationError): + standard_rule.full_clean() + + def test_duplicate_index_per_acl_fail(self): + """ + Test that the rule index must be unique per AccessList. + """ + params = { + "access_list": self.standard_acl1, + "index": 10, + "action": "permit", + } + rule_1 = ACLStandardRule(**params) + rule_1.full_clean() + rule_1.save() + rule_2 = ACLStandardRule(**params) + with self.assertRaises(ValidationError): + rule_2.full_clean() + + def test_acl_standard_rule_action_permit_with_remark_fail(self): + """ + Test that ACLStandardRule with action 'permit' and remark fails validation. + """ + invalid_rule = ACLStandardRule( + access_list=self.standard_acl1, + index=10, + action="permit", + remark="Remark", + source_prefix=None, + description="Invalid rule with action 'permit' and remark", + ) + with self.assertRaises(ValidationError): + invalid_rule.full_clean() + + def test_acl_standard_rule_action_remark_with_no_remark_fail(self): + """ + Test that ACLStandardRule with action 'remark' and without remark fails validation. + """ + invalid_rule = ACLStandardRule( + access_list=self.standard_acl1, + index=10, + action="remark", + remark="", + source_prefix=None, + description="Invalid rule with action 'remark' and without remark", + ) + with self.assertRaises(ValidationError): + invalid_rule.full_clean() + + def test_acl_standard_rule_action_remark_with_source_prefix_fail(self): + """ + Test that ACLStandardRule with action 'remark' and source prefix fails validation. + """ + invalid_rule = ACLStandardRule( + access_list=self.standard_acl1, + index=10, + action="remark", + remark="", + source_prefix=self.prefix1, + description="Invalid rule with action 'remark' and source prefix", + ) + with self.assertRaises(ValidationError): + invalid_rule.full_clean() + + def test_valid_acl_rule_action_choices(self): + """ + Test ACLStandardRule action choices using VALID choices. + """ + valid_acl_rule_action_choices = ["deny", "permit", "remark"] + + for action_choice in valid_acl_rule_action_choices: + valid_acl_rule_action = ACLStandardRule( + access_list=self.standard_acl1, + index=10, + action=action_choice, + remark="Remark" if action_choice == "remark" else None, + description=f"VALID ACL RULE ACTION CHOICES USED: action={action_choice}", + ) + valid_acl_rule_action.full_clean() + + def test_invalid_acl_rule_action_choices(self): + """ + Test ACLStandardRule action choices using INVALID choices. + """ + invalid_acl_rule_action_choice = "both" + + invalid_acl_rule_action = ACLStandardRule( + access_list=self.standard_acl1, + index=10, + action=invalid_acl_rule_action_choice, + description=f"INVALID ACL RULE ACTION CHOICES USED: action={invalid_acl_rule_action_choice}", + ) + + with self.assertRaises(ValidationError): + invalid_acl_rule_action.full_clean() From 3fa1506a7258a2ed6fcab3b3afa5c820955efac6 Mon Sep 17 00:00:00 2001 From: Martin Hauser Date: Fri, 28 Mar 2025 23:07:37 +0100 Subject: [PATCH 16/16] chore(tests): Add ACLExtendedRule test coverage Introduces comprehensive test cases for the ACLExtendedRule model. These cover rule creation, validation, and edge cases such as duplicate indices and invalid combinations of parameters. Fixes #47 --- .../tests/models/test_extendedrules.py | 530 +++++++++++++++++- 1 file changed, 528 insertions(+), 2 deletions(-) diff --git a/netbox_acls/tests/models/test_extendedrules.py b/netbox_acls/tests/models/test_extendedrules.py index 1efc248d..b84813b1 100644 --- a/netbox_acls/tests/models/test_extendedrules.py +++ b/netbox_acls/tests/models/test_extendedrules.py @@ -1,9 +1,535 @@ +from django.core.exceptions import ValidationError + +from netbox_acls.choices import ACLProtocolChoices, ACLTypeChoices +from netbox_acls.models import AccessList, ACLExtendedRule + from .base import BaseTestCase -class ACLExtendedRule(BaseTestCase): +class TestACLExtendedRule(BaseTestCase): """ Test ACLExtendedRule model. """ - # TODO: Develop tests for ACLExtendedRule model + @classmethod + def setUpTestData(cls): + """ + Extend BaseTestCase's setUpTestData() to create additional data for testing. + """ + super().setUpTestData() + + cls.acl_type = ACLTypeChoices.TYPE_EXTENDED + cls.default_action = "deny" + cls.protocol = ACLProtocolChoices.PROTOCOL_TCP + + # AccessLists + cls.extended_acl1 = AccessList.objects.create( + name="EXTENDED_ACL", + assigned_object=cls.device1, + type=cls.acl_type, + default_action=cls.default_action, + comments="EXTENDED_ACL", + ) + cls.extended_acl2 = AccessList.objects.create( + name="EXTENDED_ACL", + assigned_object=cls.virtual_machine1, + type=cls.acl_type, + default_action=cls.default_action, + comments="EXTENDED_ACL", + ) + + def test_acl_extended_rule_creation_success(self): + """ + Test that ACLExtendedRule creation passes validation. + """ + created_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action="permit", + remark="", + source_prefix=None, + source_ports=None, + destination_prefix=None, + destination_ports=None, + protocol=None, + description=( + "Created rule with any source prefix, any source port, " + "any destination prefix, any destination port, and any protocol." + ), + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLExtendedRule), True) + self.assertEqual(created_rule.index, 10) + self.assertEqual(created_rule.action, "permit") + self.assertEqual(created_rule.remark, "") + self.assertEqual(created_rule.source_prefix, None) + self.assertEqual(created_rule.source_ports, None) + self.assertEqual(created_rule.destination_prefix, None) + self.assertEqual(created_rule.destination_ports, None) + self.assertEqual(created_rule.protocol, None) + self.assertEqual( + created_rule.description, + ( + "Created rule with any source prefix, any source port, " + "any destination prefix, any destination port, and any protocol." + ), + ) + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_acl_extended_rule_source_prefix_creation_success(self): + """ + Test that ACLExtendedRule with source prefix creation passes validation. + """ + created_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=20, + action="permit", + remark="", + source_prefix=self.prefix1, + source_ports=None, + destination_prefix=None, + destination_ports=None, + protocol=None, + description="Created rule with source prefix", + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLExtendedRule), True) + self.assertEqual(created_rule.index, 20) + self.assertEqual(created_rule.action, "permit") + self.assertEqual(created_rule.remark, "") + self.assertEqual(created_rule.source_prefix, self.prefix1) + self.assertEqual(created_rule.source_ports, None) + self.assertEqual(created_rule.destination_prefix, None) + self.assertEqual(created_rule.destination_ports, None) + self.assertEqual(created_rule.protocol, None) + self.assertEqual(created_rule.description, "Created rule with source prefix") + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_acl_extended_rule_source_ports_creation_success(self): + """ + Test that ACLExtendedRule with source ports creation passes validation. + """ + created_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=30, + action="permit", + remark="", + source_prefix=self.prefix1, + source_ports=[22, 443], + destination_prefix=None, + destination_ports=None, + protocol=self.protocol, + description="Created rule with source ports", + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLExtendedRule), True) + self.assertEqual(created_rule.index, 30) + self.assertEqual(created_rule.action, "permit") + self.assertEqual(created_rule.remark, "") + self.assertEqual(created_rule.source_prefix, self.prefix1) + self.assertEqual(created_rule.source_ports, [22, 443]) + self.assertEqual(created_rule.destination_prefix, None) + self.assertEqual(created_rule.destination_ports, None) + self.assertEqual(created_rule.protocol, self.protocol) + self.assertEqual(created_rule.description, "Created rule with source ports") + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_acl_extended_rule_destination_prefix_creation_success(self): + """ + Test that ACLExtendedRule with destination prefix creation passes validation. + """ + created_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=40, + action="permit", + remark="", + source_prefix=None, + source_ports=None, + destination_prefix=self.prefix1, + destination_ports=None, + protocol=None, + description="Created rule with destination prefix", + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLExtendedRule), True) + self.assertEqual(created_rule.index, 40) + self.assertEqual(created_rule.action, "permit") + self.assertEqual(created_rule.remark, "") + self.assertEqual(created_rule.source_prefix, None) + self.assertEqual(created_rule.source_ports, None) + self.assertEqual(created_rule.destination_prefix, self.prefix1) + self.assertEqual(created_rule.destination_ports, None) + self.assertEqual(created_rule.protocol, None) + self.assertEqual(created_rule.description, "Created rule with destination prefix") + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_acl_extended_rule_destination_ports_creation_success(self): + """ + Test that ACLExtendedRule with destination ports creation passes validation. + """ + created_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=50, + action="permit", + remark="", + source_prefix=None, + source_ports=None, + destination_prefix=self.prefix1, + destination_ports=[22, 443], + protocol=self.protocol, + description="Created rule with destination ports", + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLExtendedRule), True) + self.assertEqual(created_rule.index, 50) + self.assertEqual(created_rule.action, "permit") + self.assertEqual(created_rule.remark, "") + self.assertEqual(created_rule.source_prefix, None) + self.assertEqual(created_rule.source_ports, None) + self.assertEqual(created_rule.destination_prefix, self.prefix1) + self.assertEqual(created_rule.destination_ports, [22, 443]) + self.assertEqual(created_rule.protocol, self.protocol) + self.assertEqual(created_rule.description, "Created rule with destination ports") + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_acl_extended_rule_icmp_protocol_creation_success(self): + """ + Test that ACLExtendedRule with ICMP protocol creation passes validation. + """ + created_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=60, + action="permit", + remark="", + source_prefix=self.prefix1, + source_ports=None, + destination_prefix=self.prefix2, + destination_ports=None, + protocol=ACLProtocolChoices.PROTOCOL_ICMP, + description="Created rule with ICMP protocol", + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLExtendedRule), True) + self.assertEqual(created_rule.index, 60) + self.assertEqual(created_rule.action, "permit") + self.assertEqual(created_rule.remark, "") + self.assertEqual(created_rule.source_prefix, self.prefix1) + self.assertEqual(created_rule.source_ports, None) + self.assertEqual(created_rule.destination_prefix, self.prefix2) + self.assertEqual(created_rule.destination_ports, None) + self.assertEqual(created_rule.protocol, ACLProtocolChoices.PROTOCOL_ICMP) + self.assertEqual(created_rule.description, "Created rule with ICMP protocol") + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_acl_extended_rule_complete_params_creation_success(self): + """ + Test that ACLExtendedRule with complete parameters creation passes validation. + """ + created_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=70, + action="permit", + remark="", + source_prefix=self.prefix1, + source_ports=[4000, 5000], + destination_prefix=self.prefix2, + destination_ports=[22, 443], + protocol=self.protocol, + description="Created rule with complete parameters", + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLExtendedRule), True) + self.assertEqual(created_rule.index, 70) + self.assertEqual(created_rule.action, "permit") + self.assertEqual(created_rule.remark, "") + self.assertEqual(created_rule.source_prefix, self.prefix1) + self.assertEqual(created_rule.source_ports, [4000, 5000]) + self.assertEqual(created_rule.destination_prefix, self.prefix2) + self.assertEqual(created_rule.destination_ports, [22, 443]) + self.assertEqual(created_rule.protocol, self.protocol) + self.assertEqual(created_rule.description, "Created rule with complete parameters") + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_acl_extended_rule_remark_creation_success(self): + """ + Test that ACLExtendedRule with remark creation passes validation. + """ + created_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=80, + action="remark", + remark="Test remark", + source_prefix=None, + source_ports=None, + destination_prefix=None, + destination_ports=None, + protocol=None, + description="Created rule with remark", + ) + created_rule.full_clean() + + self.assertTrue(isinstance(created_rule, ACLExtendedRule), True) + self.assertEqual(created_rule.index, 80) + self.assertEqual(created_rule.action, "remark") + self.assertEqual(created_rule.remark, "Test remark") + self.assertEqual(created_rule.source_prefix, None) + self.assertEqual(created_rule.source_ports, None) + self.assertEqual(created_rule.destination_prefix, None) + self.assertEqual(created_rule.destination_ports, None) + self.assertEqual(created_rule.protocol, None) + self.assertEqual(created_rule.description, "Created rule with remark") + self.assertEqual(isinstance(created_rule.access_list, AccessList), True) + self.assertEqual(created_rule.access_list.type, self.acl_type) + + def test_access_list_standard_to_acl_extended_rule_assignment_fail(self): + """ + Test that Standard Access List cannot be assigned to ACLExtendedRule. + """ + standard_acl1 = AccessList.objects.create( + name="STANDARD_ACL", + assigned_object=self.device1, + type=ACLTypeChoices.TYPE_STANDARD, + default_action=self.default_action, + comments="STANDARD_ACL", + ) + extended_rule = ACLExtendedRule( + access_list=standard_acl1, + index=80, + action="remark", + remark="Test remark", + source_prefix=None, + source_ports=None, + destination_prefix=None, + destination_ports=None, + protocol=None, + description="Created rule with remark", + ) + with self.assertRaises(ValidationError): + extended_rule.full_clean() + + def test_duplicate_index_per_acl_fail(self): + """ + Test that the rule index must be unique per AccessList. + """ + params = { + "access_list": self.extended_acl1, + "index": 10, + "action": "permit", + } + rule_1 = ACLExtendedRule(**params) + rule_1.full_clean() + rule_1.save() + rule_2 = ACLExtendedRule(**params) + with self.assertRaises(ValidationError): + rule_2.full_clean() + + def test_acl_extended_rule_action_permit_with_remark_fail(self): + """ + Test that ACLExtendedRule with action 'permit' and remark fails validation. + """ + invalid_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action="permit", + remark="Remark", + source_prefix=None, + source_ports=None, + destination_prefix=None, + destination_ports=None, + protocol=None, + description="Invalid rule with action 'permit' and remark", + ) + with self.assertRaises(ValidationError): + invalid_rule.full_clean() + + def test_acl_extended_rule_action_remark_with_no_remark_fail(self): + """ + Test that ACLExtendedRule with action 'remark' and without remark fails validation. + """ + invalid_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action="remark", + remark="", + source_prefix=None, + source_ports=None, + destination_prefix=None, + destination_ports=None, + protocol=None, + description="Invalid rule with action 'remark' and without remark", + ) + with self.assertRaises(ValidationError): + invalid_rule.full_clean() + + def test_acl_extended_rule_action_remark_with_source_prefix_fail(self): + """ + Test that ACLExtendedRule with action 'remark' and source prefix fails validation. + """ + invalid_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action="remark", + remark="", + source_prefix=self.prefix1, + source_ports=None, + destination_prefix=None, + destination_ports=None, + protocol=None, + description="Invalid rule with action 'remark' and source prefix", + ) + with self.assertRaises(ValidationError): + invalid_rule.full_clean() + + def test_acl_extended_rule_action_remark_with_source_ports_fail(self): + """ + Test that ACLExtendedRule with action 'remark' and source ports fails validation. + """ + invalid_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action="remark", + remark="", + source_prefix=self.prefix1, + source_ports=[80, 443], + destination_prefix=None, + destination_ports=None, + protocol=ACLProtocolChoices.PROTOCOL_TCP, + description="Invalid rule with action 'remark' and source ports", + ) + with self.assertRaises(ValidationError): + invalid_rule.full_clean() + + def test_acl_extended_rule_action_remark_with_destination_prefix_fail(self): + """ + Test that ACLExtendedRule with action 'remark' and destination prefix fails validation. + """ + invalid_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action="remark", + remark="", + source_prefix=None, + source_ports=None, + destination_prefix=self.prefix1, + destination_ports=None, + protocol=None, + description="Invalid rule with action 'remark' and destination prefix", + ) + with self.assertRaises(ValidationError): + invalid_rule.full_clean() + + def test_acl_extended_rule_action_remark_with_destination_ports_fail(self): + """ + Test that ACLExtendedRule with action 'remark' and destination ports fails validation. + """ + invalid_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action="remark", + remark="", + source_prefix=None, + source_ports=None, + destination_prefix=self.prefix1, + destination_ports=[80, 443], + protocol=ACLProtocolChoices.PROTOCOL_TCP, + description="Invalid rule with action 'remark' and destination ports", + ) + with self.assertRaises(ValidationError): + invalid_rule.full_clean() + + def test_acl_extended_rule_action_remark_with_protocol_fail(self): + """ + Test that ACLExtendedRule with action 'remark' and protocol fails validation. + """ + invalid_rule = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action="remark", + remark="", + source_prefix=None, + source_ports=None, + destination_prefix=None, + destination_ports=None, + protocol=ACLProtocolChoices.PROTOCOL_ICMP, + description="Invalid rule with action 'remark' and ICMP protocol", + ) + with self.assertRaises(ValidationError): + invalid_rule.full_clean() + + def test_valid_acl_rule_action_choices(self): + """ + Test ACLExtendedRule action choices using VALID choices. + """ + valid_acl_rule_action_choices = ["deny", "permit", "remark"] + + for action_choice in valid_acl_rule_action_choices: + valid_acl_rule_action = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action=action_choice, + remark="Remark" if action_choice == "remark" else None, + description=f"VALID ACL RULE ACTION CHOICES USED: action={action_choice}", + ) + valid_acl_rule_action.full_clean() + + def test_invalid_acl_rule_action_choices(self): + """ + Test ACLExtendedRule action choices using INVALID choices. + """ + invalid_acl_rule_action_choice = "both" + + invalid_acl_rule_action = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action=invalid_acl_rule_action_choice, + description=f"INVALID ACL RULE ACTION CHOICES USED: action={invalid_acl_rule_action_choice}", + ) + + with self.assertRaises(ValidationError): + invalid_acl_rule_action.full_clean() + + def test_valid_acl_rule_protocol_choices(self): + """ + Test ACLExtendedRule protocol choices using VALID choices. + """ + valid_acl_rule_protocol_choices = ["icmp", "tcp", "udp"] + + for protocol_choice in valid_acl_rule_protocol_choices: + valid_acl_rule_protocol = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + action=self.default_action, + protocol=protocol_choice, + description=f"VALID ACL RULE PROTOCOL CHOICES USED: protocol={protocol_choice}", + ) + valid_acl_rule_protocol.full_clean() + + def test_invalid_acl_rule_protocol_choices(self): + """ + Test ACLExtendedRule protocol choices using INVALID choices. + """ + invalid_acl_rule_protocol_choice = "ethernet" + + invalid_acl_rule_protocol = ACLExtendedRule( + access_list=self.extended_acl1, + index=10, + protocol=invalid_acl_rule_protocol_choice, + description=f"INVALID ACL RULE PROTOCOL CHOICES USED: protocol={invalid_acl_rule_protocol_choice}", + ) + + with self.assertRaises(ValidationError): + invalid_acl_rule_protocol.full_clean()