Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/268.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add API command to create fake events
67 changes: 66 additions & 1 deletion src/zino/api/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,33 @@
from datetime import datetime, timedelta, timezone
from functools import wraps
from pathlib import Path
from typing import TYPE_CHECKING, Callable, List, NamedTuple, Optional, Union
from typing import TYPE_CHECKING, Callable, List, NamedTuple, Optional, Union, get_args

from zino import version
from zino.api import auth
from zino.api.notify import Zino1NotificationProtocol
from zino.scheduler import get_scheduler
from zino.state import ZinoState, config
from zino.statemodels import (
AlarmEvent,
AlarmType,
BFDEvent,
BGPEvent,
ClosedEventError,
DeviceMaintenance,
Event,
EventState,
MatchType,
PlannedMaintenance,
PmType,
PortStateEvent,
PortStateMaintenance,
ReachabilityEvent,
)
from zino.tasks import run_all_tasks
from zino.tasks.linkstatetask import LinkStateTask
from zino.time import now
from zino.utils import parse_ip

if TYPE_CHECKING:
from zino.api.server import ZinoServer
Expand Down Expand Up @@ -620,3 +627,61 @@ async def do_raiseerror(self):
that exceptions that go unhandled by a command responder is handled by the protocol engine.
"""
1 / 0 # noqa

@requires_authentication
async def do_fakeevent(self, device_name: str, subindex: Optional[str], event_class_name: str, event_state: str):
"""Implements an FAKEEVENT command that did not exist in the Zino 1 protocol. This is just used for testing the
frontend.
"""
event_subclasses = {event_c.model_fields["type"].default: event_c for event_c in Event.__subclasses__()}
event_class_name = event_class_name.lower()

if event_class_name not in event_subclasses.keys():
return self._respond_error(
f"Given event class type not in available event class types: {event_subclasses.keys()}"
)

try:
event_state = EventState(event_state)
except ValueError:
return self._respond_error(
f"Given event state {event_state} not in available event states: {[state.value for state in EventState]}"
)

events = self._state.events
event_class = event_subclasses[event_class_name]

if event_class == AlarmEvent and subindex not in get_args(AlarmType):
return self._respond_error(
f"Given subindex {subindex} is not a valid alarm type. Valid alarm types: {get_args(AlarmType)}"
)
if event_class in [BFDEvent, PortStateEvent]:
try:
subindex = int(subindex)
except ValueError:
return self._respond_error(f"Given subindex {subindex} is not a number.")
if event_class == BGPEvent:
try:
subindex = parse_ip(subindex)
except ValueError:
return self._respond_error(f"Given subindex {subindex} is not an IP address.")
if event_class == ReachabilityEvent:
subindex = None

if events.get(device_name=device_name, subindex=subindex, event_class=event_class):
return self._respond_error("An event with the given parameters already exists.")

event = events.create_event(device_name=device_name, subindex=subindex, event_class=event_class)

if event_class == AlarmEvent:
event.alarm_type = subindex
if event_class in [BFDEvent, PortStateEvent]:
event.ifindex = subindex
if event_class == BGPEvent:
event.remote_addr = subindex

event.set_state(event_state)

events.commit(event=event)

return self._respond_ok(f"event created with id {event.id}")
75 changes: 75 additions & 0 deletions tests/api/legacy_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
)
from zino.time import now

DEVICE_NAME = "example-gw.example.org"


class TestZino1BaseServerProtocol:
def test_should_init_without_error(self):
Expand Down Expand Up @@ -1269,6 +1271,79 @@ async def test_when_authenticated_should_not_output_non_matching_ports(
)


class TestZino1TestProtocolFakeEventCommand:
@pytest.mark.asyncio
@pytest.mark.parametrize(
"subindex,event_class",
[
("yellow", "alarm"),
(123, "bfd"),
("127.0.0.1", "bgp"),
(123, "portstate"),
("None", "reachability"),
],
)
async def test_fakevent_should_create_event(self, subindex, event_class):
protocol = ZinoTestProtocol()
fake_transport = Mock()
protocol.connection_made(fake_transport)
protocol.user = "foo"
fake_transport.write = Mock()
command = f"FAKEEVENT {DEVICE_NAME} {subindex} {event_class} ignored"
await protocol.message_received(command)

assert fake_transport.write.called
response = fake_transport.write.call_args[0][0].decode("utf-8")
assert response.startswith("200 ")

event_id = re.search(r"event created with id (?P<id>\d+)", response).group("id")
assert event_id
assert protocol._state.events.events.get(int(event_id), None)

@pytest.mark.asyncio
@pytest.mark.parametrize(
"subindex,event_class,event_state",
[
(123, "invalid-event-type", "ignored"),
("blue", "alarm", "ignored"),
(123, "portstate", "invalid-state"),
("abc", "portstate", "ignored"),
("abc", "bgp", "ignored"),
],
)
async def test_fakevent_should_fail_on_invalid_input(self, subindex, event_class, event_state):
protocol = ZinoTestProtocol()
fake_transport = Mock()
protocol.connection_made(fake_transport)
protocol.user = "foo"
fake_transport.write = Mock()
command = f"FAKEEVENT {DEVICE_NAME} {subindex} {event_class} {event_state}"
await protocol.message_received(command)

assert fake_transport.write.called
response = fake_transport.write.call_args[0][0].decode("utf-8")
assert response.startswith("500 ")

@pytest.mark.asyncio
async def test_fakevent_should_fail_if_event_with_given_index_exists_already(self):
protocol = ZinoTestProtocol()
fake_transport = Mock()
protocol.connection_made(fake_transport)
protocol.user = "foo"
fake_transport.write = Mock()

event = ReachabilityEvent(router=DEVICE_NAME, state=EventState.OPEN)
protocol._state.events.commit(event=event)

command = f"FAKEEVENT {DEVICE_NAME} None reachability open"
await protocol.message_received(command)

assert fake_transport.write.called
response = fake_transport.write.call_args[0][0].decode("utf-8")
assert response.startswith("500 ")
assert "An event with the given parameters already exists" in response


def test_requires_authentication_should_set_function_attribute():
@requires_authentication
def throwaway():
Expand Down