Skip to content

asn, fake expert: default paths, asn: add checks #2606

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ Please refer to the [NEWS](NEWS.md) for a list of changes which have an affect o
- `intelmq.bots.parsers.cymru.parser_cap_program`: Add mapping for TOR and ipv6-icmp protocol (PR#2621 by Mikk Margus Möll).

#### Experts
- `intelmq.bots.experts.asn_lookup.expert`: Print URLs to stdout only in verbose mode (PR#2591 by Sebastian Wagner).
- `intelmq.bots.experts.asn_lookup.expert`:
- Print URLs to stdout only in verbose mode (PR#2591 by Sebastian Wagner).
- Check for database file existence and writability (fixes #2566).
- Use database path matching to installation type (PR#2606 by Sebastian Wagner).
- `intelmq.bots.experts.fake.expert`: Use database path matching to installation type (PR#2606 by Sebastian Wagner).

#### Outputs

Expand Down
36 changes: 26 additions & 10 deletions intelmq/bots/experts/asn_lookup/expert.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import re
import sys
import bz2
import pathlib
import requests
from datetime import datetime, timedelta
from pathlib import Path

from intelmq import VAR_STATE_PATH
from intelmq.lib.bot import ExpertBot
from intelmq.lib.exceptions import MissingDependencyError
from intelmq.lib.utils import get_bots_settings, create_request_session
Expand All @@ -25,7 +27,7 @@

class ASNLookupExpertBot(ExpertBot):
"""Add ASN and netmask information from a local BGP dump"""
database = None # TODO: should be pathlib.Path
database: str = f'{VAR_STATE_PATH}asn_lookup/ipasn.dat' # TODO: should be pathlib.Path
autoupdate_cached_database: bool = True # Activate/deactivate update-database functionality

def init(self):
Expand All @@ -35,11 +37,11 @@ def init(self):
try:
self._database = pyasn.pyasn(self.database)
except OSError:
self.logger.error("pyasn data file does not exist or could not be "
"accessed in %r.", self.database)
self.logger.error("Read 'bots/experts/asn_lookup/README' and "
"follow the procedure.")
raise ValueError(f"pyasn data file does not exist or could not be accessed in {self.database!r}. ",
"Please see https://docs.intelmq.org/latest/user/bots/#asn-lookup")
self.stop()
if not Path(self.database).is_file():
raise ValueError('Database file does not exist or is not a file.')

def process(self):
event = self.receive_message()
Expand All @@ -66,12 +68,20 @@ def process(self):

@staticmethod
def check(parameters):
if not os.path.exists(parameters.get('database', '')):
return [["error", "File given as parameter 'database' does not exist."]]
database_path = Path(parameters.get('database', ''))
if not database_path.exists():
return [["warning", f"File given as parameter 'database' ({database_path!s}) does not exist. You may need to trigger first downloading manually. See: https://docs.intelmq.org/latest/user/bots/#asn-lookup."]]
elif not database_path.is_file():
return [["error", f"Parameter 'database' ({database_path!s}) exists, but is not a file."]]
try:
pyasn.pyasn(parameters['database'])
except Exception as exc:
return [["error", "Error reading database: %r." % exc]]
return [["error", f"Error reading database ({database_path!s}): {exc!r}."]]

# Check the age of the database file
# use local time zone for both time operations
if datetime.now() - datetime.fromtimestamp(database_path.stat().st_mtime) < timedelta(weeks=1):
return [["warning", f"Database ({database_path!s}) is older than one week. Check the auto update, see: https://docs.intelmq.org/latest/user/bots/#asn-lookup."]]

@classmethod
def run(cls, parsed_args=None):
Expand Down Expand Up @@ -112,6 +122,12 @@ def update_database(cls, verbose=False):
if pyasn is None:
raise MissingDependencyError("pyasn")

for database_path in set(bots.values()):
if not Path(database_path).is_file():
raise ValueError('Database file does not exist or is not a file.')
elif not os.access(database_path, os.W_OK):
raise ValueError('Database file is not writeable.')

try:
if verbose:
print("Searching for the latest database update...")
Expand Down Expand Up @@ -159,7 +175,7 @@ def update_database(cls, verbose=False):
prefixes = pyasn.mrtx.parse_mrt_file(archive, print_progress=False, skip_record_on_error=True)

for database_path in set(bots.values()):
database_dir = pathlib.Path(database_path).parent
database_dir = Path(database_path).parent
database_dir.mkdir(parents=True, exist_ok=True)
pyasn.mrtx.dump_prefixes_to_file(prefixes, database_path)

Expand Down
3 changes: 2 additions & 1 deletion intelmq/bots/experts/fake/expert.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
from json import load as json_load

from intelmq.lib.bot import ExpertBot
from intelmq import VAR_STATE_PATH


class FakeExpertBot(ExpertBot):
"""Add fake data"""

overwrite: bool = False
database: str = None # TODO: should be pathlib.Path
database: str = f'{VAR_STATE_PATH}fake/data.json' # TODO: should be pathlib.Path

def init(self):
with open(self.database) as database:
Expand Down
Loading