Skip to content

Commit 5caa3e0

Browse files
committed
ENH: Add possibility to delay generating MISP Feed
Generating MISP feed on every incoming message slows down processing. The new config option let us decide to save them in batches. Cached events are stored in a cache list in Redis. In addition, a code related to Python 3.6 was removed as we do not support this version any more.
1 parent 96c8c83 commit 5caa3e0

File tree

4 files changed

+135
-47
lines changed

4 files changed

+135
-47
lines changed

intelmq/bots/outputs/misp/output_feed.py

Lines changed: 64 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@
55
# -*- coding: utf-8 -*-
66
import datetime
77
import json
8+
import re
89
from pathlib import Path
910
from uuid import uuid4
10-
import re
1111

1212
from intelmq.lib.bot import OutputBot
1313
from intelmq.lib.exceptions import MissingDependencyError
14+
from intelmq.lib.mixins import CacheMixin
1415
from intelmq.lib.utils import parse_relative
1516

1617
try:
@@ -19,19 +20,14 @@
1920
except ImportError:
2021
# catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501
2122
MISPEvent = None
22-
import_fail_reason = 'import'
23-
except SyntaxError:
24-
# catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501
25-
MISPEvent = None
26-
import_fail_reason = 'syntax'
27-
23+
import_fail_reason = "import"
2824

29-
# NOTE: This module is compatible with Python 3.6+
3025

31-
32-
class MISPFeedOutputBot(OutputBot):
26+
class MISPFeedOutputBot(OutputBot, CacheMixin):
3327
"""Generate an output in the MISP Feed format"""
28+
3429
interval_event: str = "1 hour"
30+
delay_save_event_count: int = None
3531
misp_org_name = None
3632
misp_org_uuid = None
3733
output_dir: str = "/opt/intelmq/var/lib/bots/mispfeed-output" # TODO: should be path
@@ -45,13 +41,8 @@ def check_output_dir(dirname):
4541
return True
4642

4743
def init(self):
48-
if MISPEvent is None and import_fail_reason == 'syntax':
49-
raise MissingDependencyError("pymisp",
50-
version='>=2.4.117.3',
51-
additional_text="Python versions below 3.6 are "
52-
"only supported by pymisp <= 2.4.119.1.")
53-
elif MISPEvent is None:
54-
raise MissingDependencyError('pymisp', version='>=2.4.117.3')
44+
if MISPEvent is None:
45+
raise MissingDependencyError("pymisp", version=">=2.4.117.3")
5546

5647
self.current_event = None
5748

@@ -71,59 +62,90 @@ def init(self):
7162
try:
7263
with (self.output_dir / '.current').open() as f:
7364
self.current_file = Path(f.read())
74-
self.current_event = MISPEvent()
75-
self.current_event.load_file(self.current_file)
76-
77-
last_min_time, last_max_time = re.findall('IntelMQ event (.*) - (.*)', self.current_event.info)[0]
78-
last_min_time = datetime.datetime.strptime(last_min_time, '%Y-%m-%dT%H:%M:%S.%f')
79-
last_max_time = datetime.datetime.strptime(last_max_time, '%Y-%m-%dT%H:%M:%S.%f')
80-
if last_max_time < datetime.datetime.now():
81-
self.min_time_current = datetime.datetime.now()
82-
self.max_time_current = self.min_time_current + self.timedelta
83-
self.current_event = None
84-
else:
85-
self.min_time_current = last_min_time
86-
self.max_time_current = last_max_time
65+
66+
if self.current_file.exists():
67+
self.current_event = MISPEvent()
68+
self.current_event.load_file(self.current_file)
69+
70+
last_min_time, last_max_time = re.findall(
71+
"IntelMQ event (.*) - (.*)", self.current_event.info
72+
)[0]
73+
last_min_time = datetime.datetime.strptime(
74+
last_min_time, "%Y-%m-%dT%H:%M:%S.%f"
75+
)
76+
last_max_time = datetime.datetime.strptime(
77+
last_max_time, "%Y-%m-%dT%H:%M:%S.%f"
78+
)
79+
if last_max_time < datetime.datetime.now():
80+
self.min_time_current = datetime.datetime.now()
81+
self.max_time_current = self.min_time_current + self.timedelta
82+
self.current_event = None
83+
else:
84+
self.min_time_current = last_min_time
85+
self.max_time_current = last_max_time
8786
except:
88-
self.logger.exception("Loading current event %s failed. Skipping it.", self.current_event)
87+
self.logger.exception(
88+
"Loading current event %s failed. Skipping it.", self.current_event
89+
)
8990
self.current_event = None
9091
else:
9192
self.min_time_current = datetime.datetime.now()
9293
self.max_time_current = self.min_time_current + self.timedelta
9394

9495
def process(self):
95-
9696
if not self.current_event or datetime.datetime.now() > self.max_time_current:
9797
self.min_time_current = datetime.datetime.now()
9898
self.max_time_current = self.min_time_current + self.timedelta
9999
self.current_event = MISPEvent()
100-
self.current_event.info = ('IntelMQ event {begin} - {end}'
101-
''.format(begin=self.min_time_current.isoformat(),
102-
end=self.max_time_current.isoformat()))
100+
self.current_event.info = "IntelMQ event {begin} - {end}" "".format(
101+
begin=self.min_time_current.isoformat(),
102+
end=self.max_time_current.isoformat(),
103+
)
103104
self.current_event.set_date(datetime.date.today())
104105
self.current_event.Orgc = self.misp_org
105106
self.current_event.uuid = str(uuid4())
106-
self.current_file = self.output_dir / f'{self.current_event.uuid}.json'
107-
with (self.output_dir / '.current').open('w') as f:
107+
self.current_file = self.output_dir / f"{self.current_event.uuid}.json"
108+
with (self.output_dir / ".current").open("w") as f:
108109
f.write(str(self.current_file))
109110

111+
# On startup or when timeout occurs, clean the queue to ensure we do not
112+
# keep events forever because there was not enough generated
113+
self._generate_feed()
114+
110115
event = self.receive_message().to_dict(jsondict_as_string=True)
111116

112-
obj = self.current_event.add_object(name='intelmq_event')
113-
for object_relation, value in event.items():
117+
cache_size = None
118+
if self.delay_save_event_count:
119+
cache_size = self.cache_put(event)
120+
121+
if cache_size is None:
122+
self._generate_feed(event)
123+
elif cache_size >= self.delay_save_event_count:
124+
self._generate_feed()
125+
126+
self.acknowledge_message()
127+
128+
def _add_message_to_feed(self, message: dict):
129+
obj = self.current_event.add_object(name="intelmq_event")
130+
for object_relation, value in message.items():
114131
try:
115132
obj.add_attribute(object_relation, value=value)
116133
except NewAttributeError:
117134
# This entry isn't listed in the harmonization file, ignoring.
118135
pass
119136

120-
feed_output = self.current_event.to_feed(with_meta=False)
137+
def _generate_feed(self, message: dict = None):
138+
if message:
139+
self._add_message_to_feed(message)
140+
141+
while message := self.cache_pop():
142+
self._add_message_to_feed(message)
121143

122-
with self.current_file.open('w') as f:
144+
feed_output = self.current_event.to_feed(with_meta=False)
145+
with self.current_file.open("w") as f:
123146
json.dump(feed_output, f)
124147

125148
feed_meta_generator(self.output_dir)
126-
self.acknowledge_message()
127149

128150
@staticmethod
129151
def check(parameters):

intelmq/lib/bot.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,10 @@ def catch_shutdown():
279279
def harmonization(self):
280280
return self._harmonization
281281

282+
@property
283+
def bot_id(self):
284+
return self.__bot_id_full
285+
282286
def __handle_sigterm_signal(self, signum: int, stack: Optional[object]):
283287
"""
284288
Calls when a SIGTERM is received. Stops the bot.

intelmq/lib/mixins/cache.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
""" CacheMixin for IntelMQ
1+
"""CacheMixin for IntelMQ
22
33
SPDX-FileCopyrightText: 2021 Sebastian Waldbauer
44
SPDX-License-Identifier: AGPL-3.0-or-later
55
66
CacheMixin is used for caching/storing data in redis.
77
"""
88

9+
import json
910
from typing import Any, Optional
1011
import redis
1112
import intelmq.lib.utils as utils
@@ -31,7 +32,9 @@ def __init__(self, **kwargs):
3132
"socket_timeout": 5,
3233
}
3334

34-
self.__redis = redis.Redis(db=self.redis_cache_db, password=self.redis_cache_password, **kwargs)
35+
self.__redis = redis.Redis(
36+
db=self.redis_cache_db, password=self.redis_cache_password, **kwargs
37+
)
3538
super().__init__()
3639

3740
def cache_exists(self, key: str):
@@ -51,6 +54,17 @@ def cache_set(self, key: str, value: Any, ttl: Optional[int] = None):
5154
if self.redis_cache_ttl:
5255
self.__redis.expire(key, self.redis_cache_ttl)
5356

57+
def cache_put(self, value: dict) -> int:
58+
# Returns the length of the list after pushing
59+
size = self.__redis.lpush(self.bot_id, json.dumps(value))
60+
return size
61+
62+
def cache_pop(self) -> dict:
63+
data = self.__redis.rpop(self.bot_id)
64+
if data is None:
65+
return None
66+
return json.loads(data)
67+
5468
def cache_flush(self):
5569
"""
5670
Flushes the currently opened database by calling FLUSHDB.

intelmq/tests/bots/outputs/misp/test_output_feed.py

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
# SPDX-License-Identifier: AGPL-3.0-or-later
44

55
# -*- coding: utf-8 -*-
6+
import json
67
import unittest
7-
import sys
8+
from pathlib import Path
89
from tempfile import TemporaryDirectory
910

1011
import intelmq.lib.test as test
@@ -37,9 +38,9 @@
3738

3839
@test.skip_exotic()
3940
class TestMISPFeedOutputBot(test.BotTestCase, unittest.TestCase):
40-
4141
@classmethod
4242
def set_bot(cls):
43+
cls.use_cache = True
4344
cls.bot_reference = MISPFeedOutputBot
4445
cls.default_input_message = EXAMPLE_EVENT
4546
cls.directory = TemporaryDirectory()
@@ -51,10 +52,57 @@ def set_bot(cls):
5152
def test_event(self):
5253
self.run_bot()
5354

55+
current_event = open(f"{self.directory.name}/.current").read()
56+
with open(current_event) as f:
57+
objects = json.load(f).get("Event", {}).get("Object", [])
58+
assert len(objects) == 1
59+
60+
def test_accumulating_events(self):
61+
self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT]
62+
self.run_bot(iterations=2, parameters={"delay_save_event_count": 3})
63+
64+
current_event = open(f"{self.directory.name}/.current").read()
65+
66+
# First, the feed is empty - not enough events came
67+
with open(current_event) as f:
68+
objects = json.load(f).get("Event", {}).get("Object", [])
69+
assert len(objects) == 0
70+
71+
self.input_message = [EXAMPLE_EVENT]
72+
self.run_bot(parameters={"delay_save_event_count": 3})
73+
74+
# When enough events were collected, save them
75+
with open(current_event) as f:
76+
objects = json.load(f)["Event"]["Object"]
77+
assert len(objects) == 3
78+
79+
self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT, EXAMPLE_EVENT]
80+
self.run_bot(iterations=3, parameters={"delay_save_event_count": 3})
81+
82+
# We continue saving to the same file until interval timeout
83+
with open(current_event) as f:
84+
objects = json.load(f)["Event"]["Object"]
85+
assert len(objects) == 6
86+
87+
# Simulating leftovers in the queue when it's time to generate new event
88+
Path(f"{self.directory.name}/.current").unlink()
89+
self.bot.cache_put(EXAMPLE_EVENT)
90+
self.run_bot(parameters={"delay_save_event_count": 3})
91+
92+
new_event = open(f"{self.directory.name}/.current").read()
93+
with open(new_event) as f:
94+
objects = json.load(f)["Event"]["Object"]
95+
assert len(objects) == 1
96+
97+
98+
def tearDown(self):
99+
self.cache.delete(self.bot_id)
100+
super().tearDown()
101+
54102
@classmethod
55103
def tearDownClass(cls):
56104
cls.directory.cleanup()
57105

58106

59-
if __name__ == '__main__': # pragma: no cover
107+
if __name__ == "__main__": # pragma: no cover
60108
unittest.main()

0 commit comments

Comments
 (0)