Skip to content

Commit 0a38f2f

Browse files
westersfsebix
authored andcommitted
Add securitytxt bot
1 parent e86912f commit 0a38f2f

File tree

6 files changed

+201
-0
lines changed

6 files changed

+201
-0
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
wellknown-securitytxt

intelmq/bots/experts/securitytxt/__init__.py

Whitespace-only changes.
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
from typing import Optional
2+
3+
import requests
4+
from securitytxt import SecurityTXT
5+
6+
from intelmq.lib.bot import ExpertBot
7+
8+
9+
class SecurityTXTExpertBot(ExpertBot):
10+
"""
11+
A bot for retrieving contact details from a security.txt
12+
"""
13+
"""
14+
url_field: The field where to find the url which should be searched
15+
contact_field: Field in which to place the found contact details
16+
17+
only_email_address: whether to select only email addresses as contact detail (no web urls)
18+
overwrite: whether to override existing data
19+
check_expired / check_canonical: whether to perform checks on expiry date / canonical urls.
20+
"""
21+
url_field: str = "source.reverse_dns"
22+
contact_field: str = "source.abuse_contact"
23+
24+
only_email_address: bool = True
25+
overwrite: bool = True
26+
check_expired: bool = False
27+
check_canonical: bool = False
28+
29+
def init(self):
30+
if not self.url_field or not self.contact_field:
31+
raise AttributeError("Not all required fields are set.")
32+
33+
def process(self):
34+
event = self.receive_message()
35+
36+
try:
37+
self.check_prerequisites(event)
38+
primary_contact = self.get_primary_contact(event.get(self.url_field))
39+
event.add(self.contact_field, primary_contact, overwrite=self.overwrite)
40+
except NotMeetsRequirementsError as e:
41+
self.logger.debug(str(e) + " Skipping event.")
42+
except ContactNotFoundError as e:
43+
self.logger.debug(f"No contact found. {str(e)} Continue.")
44+
45+
self.send_message(event)
46+
self.acknowledge_message()
47+
48+
def check_prerequisites(self, event) -> None:
49+
"""
50+
Check whether this event should be processed by this bot, or can be skipped.
51+
:param event: The event to evaluate.
52+
"""
53+
if not event.get(self.url_field, False):
54+
raise NotMeetsRequirementsError("The URL field is empty.")
55+
if event.get(self.contact_field, False) and not self.overwrite:
56+
raise NotMeetsRequirementsError("All replace values already set.")
57+
58+
def get_primary_contact(self, url: str) -> Optional[str]:
59+
"""
60+
Given a url, get the file, check it's validity and look for contact details. The primary contact details are
61+
returned. If only_email_address is set to True, it will only return email addresses (no urls).
62+
:param url: The URL on which to look for a security.txt file
63+
:return: The contact information
64+
:raises ContactNotFoundError: if contact cannot be found
65+
"""
66+
try:
67+
securitytxt = SecurityTXT.from_url(url)
68+
if not self.security_txt_is_valid(securitytxt):
69+
raise ContactNotFoundError("SecurityTXT File not valid.")
70+
for contact in securitytxt.contact:
71+
if not self.only_email_address or SecurityTXTExpertBot.is_email_address(contact):
72+
return contact
73+
raise ContactNotFoundError("No contact details found in SecurityTXT.")
74+
except (FileNotFoundError, AttributeError, requests.exceptions.RequestException):
75+
raise ContactNotFoundError("SecurityTXT file could not be found or parsed.")
76+
77+
def security_txt_is_valid(self, securitytxt: SecurityTXT):
78+
"""
79+
Determine whether a security.txt file is valid according to parameters of the bot.
80+
:param securitytxt: The securityTXT object
81+
:return: Whether the securitytxt is valid.
82+
"""
83+
return (not self.check_expired or not securitytxt.expired) and \
84+
(not self.check_canonical or securitytxt.canonical_url())
85+
86+
@staticmethod
87+
def is_email_address(contact: str):
88+
"""
89+
Determine whether the argument is an email address
90+
:param contact: the contact
91+
:return: whether contact is email address
92+
"""
93+
return 'mailto:' in contact or '@' in contact
94+
95+
96+
class NotMeetsRequirementsError(Exception):
97+
pass
98+
99+
100+
class ContactNotFoundError(Exception):
101+
pass
102+
103+
104+
BOT = SecurityTXTExpertBot
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
requests_mock

intelmq/tests/bots/experts/securitytxt/__init__.py

Whitespace-only changes.
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
# SPDX-License-Identifier: AGPL-3.0-or-later
2+
3+
# -*- coding: utf-8 -*-
4+
"""
5+
Testing the SecurityTXT Expert Bot
6+
"""
7+
8+
import unittest
9+
10+
import requests_mock
11+
12+
import intelmq.lib.test as test
13+
from intelmq.bots.experts.securitytxt.expert import SecurityTXTExpertBot
14+
15+
EXAMPLE_INPUT_IP = {"__type": "Event",
16+
"source.ip": "192.168.123.123"}
17+
18+
EXPECTED_OUTPUT_IP = {"__type": "Event",
19+
"source.ip": "192.168.123.123",
20+
"source.account": 'test@test.local'}
21+
22+
EXAMPLE_INPUT_FQDN = {"__type": "Event",
23+
"source.fqdn": "test.local"}
24+
25+
EXPECTED_OUTPUT_FQDN = {"__type": "Event",
26+
"source.fqdn": "test.local",
27+
"source.abuse_contact": 'test.local/whitehat'}
28+
29+
EXPECTED_OUTPUT_FQDN_NO_CONTACT = {"__type": "Event",
30+
"source.fqdn": "test.local"}
31+
32+
@requests_mock.Mocker()
33+
class TestSecurityTXTExpertBot(test.BotTestCase, unittest.TestCase):
34+
"""
35+
A TestCase for the SecurityTXT Expert Bot
36+
"""
37+
38+
@classmethod
39+
def set_bot(cls):
40+
cls.bot_reference = SecurityTXTExpertBot
41+
42+
def test_ip(self, m: requests_mock.Mocker):
43+
self._run_generic_test(securitytxt_url=f"https://{EXAMPLE_INPUT_IP['source.ip']}/.well-known/security.txt",
44+
securitytxt=f"Contact: {EXPECTED_OUTPUT_IP['source.account']}",
45+
input_message=EXAMPLE_INPUT_IP,
46+
output_message=EXPECTED_OUTPUT_IP,
47+
config={'url_field': 'source.ip', 'contact_field': 'source.account',
48+
'only_email_address': False},
49+
m=m)
50+
51+
def test_fqdn(self, m: requests_mock.Mocker):
52+
self._run_generic_test(securitytxt_url=f"https://{EXAMPLE_INPUT_FQDN['source.fqdn']}/.well-known/security.txt",
53+
securitytxt=f"Contact: {EXPECTED_OUTPUT_FQDN['source.abuse_contact']}",
54+
input_message=EXAMPLE_INPUT_FQDN,
55+
output_message=EXPECTED_OUTPUT_FQDN,
56+
config={'url_field': 'source.fqdn', 'contact_field': 'source.abuse_contact',
57+
'only_email_address': False},
58+
m=m)
59+
60+
def test_only_email_address_true(self, m: requests_mock.Mocker):
61+
self._run_generic_test(securitytxt_url=f"https://{EXAMPLE_INPUT_FQDN['source.fqdn']}/.well-known/security.txt",
62+
securitytxt=f"Contact: {EXPECTED_OUTPUT_FQDN['source.abuse_contact']}",
63+
input_message=EXAMPLE_INPUT_FQDN,
64+
output_message=EXPECTED_OUTPUT_FQDN_NO_CONTACT,
65+
config={'url_field': 'source.fqdn', 'contact_field': 'source.abuse_contact',
66+
'only_email_address': True},
67+
m=m)
68+
69+
def test_expired(self, m: requests_mock.Mocker):
70+
self._run_generic_test(securitytxt_url=f"https://{EXAMPLE_INPUT_FQDN['source.fqdn']}/.well-known/security.txt",
71+
securitytxt=f"Contact: {EXPECTED_OUTPUT_FQDN['source.abuse_contact']}\nExpires: 1900-12-31T18:37:07.000Z",
72+
input_message=EXAMPLE_INPUT_FQDN,
73+
output_message=EXPECTED_OUTPUT_FQDN_NO_CONTACT,
74+
config={'url_field': 'source.fqdn', 'contact_field': 'source.abuse_contact',
75+
'only_email_address': False, 'check_expired': True},
76+
m=m)
77+
78+
def test_not_expired(self, m: requests_mock.Mocker):
79+
self._run_generic_test(securitytxt_url=f"https://{EXAMPLE_INPUT_FQDN['source.fqdn']}/.well-known/security.txt",
80+
securitytxt=f"Contact: {EXPECTED_OUTPUT_FQDN['source.abuse_contact']}\nExpires: 3000-12-31T18:37:07.000Z",
81+
input_message=EXAMPLE_INPUT_FQDN,
82+
output_message=EXPECTED_OUTPUT_FQDN,
83+
config={'url_field': 'source.fqdn', 'contact_field': 'source.abuse_contact',
84+
'only_email_address': False, 'check_expired': True},
85+
m=m)
86+
87+
def _run_generic_test(self, m: requests_mock.Mocker, config: dict, securitytxt_url: str, securitytxt: str,
88+
input_message: dict, output_message: dict):
89+
self.sysconfig = config
90+
self.prepare_bot()
91+
m.get(requests_mock.ANY, status_code=404)
92+
m.get(securitytxt_url, text=securitytxt)
93+
self.input_message = input_message
94+
self.run_bot()
95+
self.assertMessageEqual(0, output_message)

0 commit comments

Comments
 (0)