8
8
import re
9
9
import sys
10
10
import bz2
11
- import pathlib
12
11
import requests
12
+ from datetime import datetime , timedelta
13
+ from pathlib import Path
13
14
15
+ from intelmq import VAR_STATE_PATH
14
16
from intelmq .lib .bot import ExpertBot
15
17
from intelmq .lib .exceptions import MissingDependencyError
16
18
from intelmq .lib .utils import get_bots_settings , create_request_session
25
27
26
28
class ASNLookupExpertBot (ExpertBot ):
27
29
"""Add ASN and netmask information from a local BGP dump"""
28
- database = None # TODO: should be pathlib.Path
30
+ database : str = f' { VAR_STATE_PATH } asn_lookup/ipasn.dat' # TODO: should be pathlib.Path
29
31
autoupdate_cached_database : bool = True # Activate/deactivate update-database functionality
30
32
31
33
def init (self ):
@@ -35,11 +37,11 @@ def init(self):
35
37
try :
36
38
self ._database = pyasn .pyasn (self .database )
37
39
except OSError :
38
- self .logger .error ("pyasn data file does not exist or could not be "
39
- "accessed in %r." , self .database )
40
- self .logger .error ("Read 'bots/experts/asn_lookup/README' and "
41
- "follow the procedure." )
40
+ raise ValueError (f"pyasn data file does not exist or could not be accessed in { self .database !r} . " ,
41
+ "Please see https://docs.intelmq.org/latest/user/bots/#asn-lookup" )
42
42
self .stop ()
43
+ if not Path (self .database ).is_file ():
44
+ raise ValueError ('Database file does not exist or is not a file.' )
43
45
44
46
def process (self ):
45
47
event = self .receive_message ()
@@ -66,12 +68,20 @@ def process(self):
66
68
67
69
@staticmethod
68
70
def check (parameters ):
69
- if not os .path .exists (parameters .get ('database' , '' )):
70
- return [["error" , "File given as parameter 'database' does not exist." ]]
71
+ database_path = Path (parameters .get ('database' , '' ))
72
+ if not database_path .exists ():
73
+ return [["warning" , f"File given as parameter 'database' ({ database_path !s} ) does not exist. You may need to trigger first downloading manually. See: https://docs.intelmq.org/latest/user/bots/#asn-lookup." ]]
74
+ elif not database_path .is_file ():
75
+ return [["error" , f"Parameter 'database' ({ database_path !s} ) exists, but is not a file." ]]
71
76
try :
72
77
pyasn .pyasn (parameters ['database' ])
73
78
except Exception as exc :
74
- return [["error" , "Error reading database: %r." % exc ]]
79
+ return [["error" , f"Error reading database ({ database_path !s} ): { exc !r} ." ]]
80
+
81
+ # Check the age of the database file
82
+ # use local time zone for both time operations
83
+ if datetime .now () - datetime .fromtimestamp (database_path .stat ().st_mtime ) < timedelta (weeks = 1 ):
84
+ return [["warning" , f"Database ({ database_path !s} ) is older than one week. Check the auto update, see: https://docs.intelmq.org/latest/user/bots/#asn-lookup." ]]
75
85
76
86
@classmethod
77
87
def run (cls , parsed_args = None ):
@@ -112,6 +122,12 @@ def update_database(cls, verbose=False):
112
122
if pyasn is None :
113
123
raise MissingDependencyError ("pyasn" )
114
124
125
+ for database_path in set (bots .values ()):
126
+ if not Path (database_path ).is_file ():
127
+ raise ValueError ('Database file does not exist or is not a file.' )
128
+ elif not os .access (database_path , os .W_OK ):
129
+ raise ValueError ('Database file is not writeable.' )
130
+
115
131
try :
116
132
if verbose :
117
133
print ("Searching for the latest database update..." )
@@ -159,7 +175,7 @@ def update_database(cls, verbose=False):
159
175
prefixes = pyasn .mrtx .parse_mrt_file (archive , print_progress = False , skip_record_on_error = True )
160
176
161
177
for database_path in set (bots .values ()):
162
- database_dir = pathlib . Path (database_path ).parent
178
+ database_dir = Path (database_path ).parent
163
179
database_dir .mkdir (parents = True , exist_ok = True )
164
180
pyasn .mrtx .dump_prefixes_to_file (prefixes , database_path )
165
181
0 commit comments