From e7908d28403da97a7f176f8c22b7b902ae4c27d7 Mon Sep 17 00:00:00 2001 From: Johanna England Date: Fri, 21 Jun 2024 14:47:22 +0200 Subject: [PATCH 1/2] Add API command to create fake events --- changelog.d/268.added.md | 1 + src/zino/api/legacy.py | 67 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 67 insertions(+), 1 deletion(-) create mode 100644 changelog.d/268.added.md diff --git a/changelog.d/268.added.md b/changelog.d/268.added.md new file mode 100644 index 00000000..749ce748 --- /dev/null +++ b/changelog.d/268.added.md @@ -0,0 +1 @@ +Add API command to create fake events \ No newline at end of file diff --git a/src/zino/api/legacy.py b/src/zino/api/legacy.py index 048a04b9..1d2ea557 100644 --- a/src/zino/api/legacy.py +++ b/src/zino/api/legacy.py @@ -12,7 +12,7 @@ from datetime import datetime, timedelta, timezone from functools import wraps from pathlib import Path -from typing import TYPE_CHECKING, Callable, List, NamedTuple, Optional, Union +from typing import TYPE_CHECKING, Callable, List, NamedTuple, Optional, Union, get_args from zino import version from zino.api import auth @@ -20,6 +20,10 @@ from zino.scheduler import get_scheduler from zino.state import ZinoState, config from zino.statemodels import ( + AlarmEvent, + AlarmType, + BFDEvent, + BGPEvent, ClosedEventError, DeviceMaintenance, Event, @@ -27,11 +31,14 @@ MatchType, PlannedMaintenance, PmType, + PortStateEvent, PortStateMaintenance, + ReachabilityEvent, ) from zino.tasks import run_all_tasks from zino.tasks.linkstatetask import LinkStateTask from zino.time import now +from zino.utils import parse_ip if TYPE_CHECKING: from zino.api.server import ZinoServer @@ -620,3 +627,61 @@ async def do_raiseerror(self): that exceptions that go unhandled by a command responder is handled by the protocol engine. """ 1 / 0 # noqa + + @requires_authentication + async def do_fakeevent(self, device_name: str, subindex: Optional[str], event_class_name: str, event_state: str): + """Implements an FAKEEVENT command that did not exist in the Zino 1 protocol. This is just used for testing the + frontend. + """ + event_subclasses = {event_c.model_fields["type"].default: event_c for event_c in Event.__subclasses__()} + event_class_name = event_class_name.lower() + + if event_class_name not in event_subclasses.keys(): + return self._respond_error( + f"Given event class type not in available event class types: {event_subclasses.keys()}" + ) + + try: + event_state = EventState(event_state) + except ValueError: + return self._respond_error( + f"Given event state {event_state} not in available event states: {[state.value for state in EventState]}" + ) + + events = self._state.events + event_class = event_subclasses[event_class_name] + + if event_class == AlarmEvent and subindex not in get_args(AlarmType): + return self._respond_error( + f"Given subindex {subindex} is not a valid alarm type. Valid alarm types: {get_args(AlarmType)}" + ) + if event_class in [BFDEvent, PortStateEvent]: + try: + subindex = int(subindex) + except ValueError: + return self._respond_error(f"Given subindex {subindex} is not a number.") + if event_class == BGPEvent: + try: + subindex = parse_ip(subindex) + except ValueError: + return self._respond_error(f"Given subindex {subindex} is not an IP address.") + if event_class == ReachabilityEvent: + subindex = None + + if events.get(device_name=device_name, subindex=subindex, event_class=event_class): + return self._respond_error("An event with the given parameters already exists.") + + event = events.create_event(device_name=device_name, subindex=subindex, event_class=event_class) + + if event_class == AlarmEvent: + event.alarm_type = subindex + if event_class in [BFDEvent, PortStateEvent]: + event.ifindex = subindex + if event_class == BGPEvent: + event.remote_addr = subindex + + event.set_state(event_state) + + events.commit(event=event) + + return self._respond_ok(f"event created with id {event.id}") From 3a33f0cebd661796a681ec6ba1cc71431342e16a Mon Sep 17 00:00:00 2001 From: Johanna England Date: Fri, 21 Jun 2024 14:47:42 +0200 Subject: [PATCH 2/2] Add tests for fake event API command --- tests/api/legacy_test.py | 75 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/tests/api/legacy_test.py b/tests/api/legacy_test.py index 10f00a43..1c38a10c 100644 --- a/tests/api/legacy_test.py +++ b/tests/api/legacy_test.py @@ -28,6 +28,8 @@ ) from zino.time import now +DEVICE_NAME = "example-gw.example.org" + class TestZino1BaseServerProtocol: def test_should_init_without_error(self): @@ -1269,6 +1271,79 @@ async def test_when_authenticated_should_not_output_non_matching_ports( ) +class TestZino1TestProtocolFakeEventCommand: + @pytest.mark.asyncio + @pytest.mark.parametrize( + "subindex,event_class", + [ + ("yellow", "alarm"), + (123, "bfd"), + ("127.0.0.1", "bgp"), + (123, "portstate"), + ("None", "reachability"), + ], + ) + async def test_fakevent_should_create_event(self, subindex, event_class): + protocol = ZinoTestProtocol() + fake_transport = Mock() + protocol.connection_made(fake_transport) + protocol.user = "foo" + fake_transport.write = Mock() + command = f"FAKEEVENT {DEVICE_NAME} {subindex} {event_class} ignored" + await protocol.message_received(command) + + assert fake_transport.write.called + response = fake_transport.write.call_args[0][0].decode("utf-8") + assert response.startswith("200 ") + + event_id = re.search(r"event created with id (?P\d+)", response).group("id") + assert event_id + assert protocol._state.events.events.get(int(event_id), None) + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "subindex,event_class,event_state", + [ + (123, "invalid-event-type", "ignored"), + ("blue", "alarm", "ignored"), + (123, "portstate", "invalid-state"), + ("abc", "portstate", "ignored"), + ("abc", "bgp", "ignored"), + ], + ) + async def test_fakevent_should_fail_on_invalid_input(self, subindex, event_class, event_state): + protocol = ZinoTestProtocol() + fake_transport = Mock() + protocol.connection_made(fake_transport) + protocol.user = "foo" + fake_transport.write = Mock() + command = f"FAKEEVENT {DEVICE_NAME} {subindex} {event_class} {event_state}" + await protocol.message_received(command) + + assert fake_transport.write.called + response = fake_transport.write.call_args[0][0].decode("utf-8") + assert response.startswith("500 ") + + @pytest.mark.asyncio + async def test_fakevent_should_fail_if_event_with_given_index_exists_already(self): + protocol = ZinoTestProtocol() + fake_transport = Mock() + protocol.connection_made(fake_transport) + protocol.user = "foo" + fake_transport.write = Mock() + + event = ReachabilityEvent(router=DEVICE_NAME, state=EventState.OPEN) + protocol._state.events.commit(event=event) + + command = f"FAKEEVENT {DEVICE_NAME} None reachability open" + await protocol.message_received(command) + + assert fake_transport.write.called + response = fake_transport.write.call_args[0][0].decode("utf-8") + assert response.startswith("500 ") + assert "An event with the given parameters already exists" in response + + def test_requires_authentication_should_set_function_attribute(): @requires_authentication def throwaway():