diff --git a/docs/user/bots.md b/docs/user/bots.md index 6f0548628..933e6bcaa 100644 --- a/docs/user/bots.md +++ b/docs/user/bots.md @@ -5097,6 +5097,28 @@ original2@email.com,person1@email.com;person2@email.com original3@email.com, Mary ; John ``` +**`additional_grouping_keys`** + +(optional, list) By-default events are grouped by the E-Mail-Address into buckets. For each bucket one E-Mail is sent. You may add more fields to group-by here to make potentially more buckets. +Side-effect: Every field that is included in the group-by is ensured to be unique for all events in the bucket and may thus be used for templating. +Note: The keys listed here refer to the keys in the events (in contrast to the CSV column names). +Default: `[]` + +**`templating`** + +(optional, dict) Defines which strings should be processed by jinja2 templating. For templating only keys which are unique for the complete bucket are available. This always includes the destination address (`source.abuse_contact`) and all keys of `additional_grouping_keys` which are present in the bucket. There is one additional key `current_time` available which holds a `datetime.datetime` object of the current (local) time. +Note: The keys available for templating refer to the keys defined for the events (in contrast to the CSV column names). Still the keys get transformed: each `'.'` gets replaced to `_` in order to make referencing the key in jinja2 easier. +Default: `{subject: False, body: False, attachment: False}` + +**`allowed_fieldnames`** + +(optional, list) Lists the fields which are included in the csv file. Every element should be also included in `fieldnames_translation` to avoid crashes. + +**`fieldnames_translation`** + +(optional, dict) Maps each the name of each field listed in `allowed_fieldnames` to a different name to be used in the csv header. +**Warning:** The Bot will crash on sending in case a fieldname is present in an event and in `allowed_fieldnames` but not in `fieldnames_translation`. + **`attachment_name`** (optional, string) Attachment file name for the outgoing messages. May contain date formatting like this `%Y-%m-%d`. Example: "events_%Y-%m-%d" will appear as "events_2022-12-01.zip". Defaults to "intelmq_%Y-%m-%d". diff --git a/intelmq/bots/outputs/smtp_batch/output.py b/intelmq/bots/outputs/smtp_batch/output.py index b5aa5f099..14ef590d8 100644 --- a/intelmq/bots/outputs/smtp_batch/output.py +++ b/intelmq/bots/outputs/smtp_batch/output.py @@ -8,11 +8,12 @@ import sys from tempfile import NamedTemporaryFile import time -from typing import Any, Optional +from typing import Any, Iterable, Optional, Dict, List import zipfile from base64 import b64decode from collections import OrderedDict from io import StringIO +from hashlib import sha256 from redis.exceptions import TimeoutError @@ -25,6 +26,23 @@ except ImportError: Envelope = None +try: + import jinja2 + jinja_env = jinja2.Environment() +except ImportError: + jinja2 = None + + +def hash_arbitrary(value: Any) -> bytes: + value_bytes = None + if isinstance(value, str): + value_bytes = value.encode("utf-8") + elif isinstance(value, int): + value_bytes = bytes(value) + else: + value_bytes = json.dumps(value, sort_keys=True).encode("utf-8") + return sha256(value_bytes).digest() + @dataclass class Mail: @@ -32,10 +50,13 @@ class Mail: to: str path: str count: int + template_data: Dict[str, Any] class SMTPBatchOutputBot(Bot): # configurable parameters + additional_grouping_keys: Optional[list] = [] # refers to the event directly + templating: Optional[Dict[str, bool]] = {'subject': False, 'body': False, 'attachment': False} alternative_mails: Optional[str] = None bcc: Optional[list] = None email_from: str = "" @@ -75,7 +96,20 @@ def process(self): if "source.abuse_contact" in message: field = message["source.abuse_contact"] for mail in (field if isinstance(field, list) else [field]): - self.cache.redis.rpush(f"{self.key}{mail}", message.to_json()) + # - Each event goes into one bucket (equivalent to group-by) + # - The id of each bucket is calculated by hashing all the keys that should be grouped for + # - Hashing ensures the redis-key does not grow indefinitely. + # - In order to avoid collisions, each value is hashed before + # appending to the input for the redis-key-hash + # (could also be solved by special separator which would need + # to be escaped or prepending the length of the value). + h = sha256() + h.update(sha256(mail.encode("utf-8")).digest()) + for i in self.additional_grouping_keys: + if i not in message: + continue + h.update(hash_arbitrary(message[i])) + self.cache.redis.rpush(f"{self.key}{h.hexdigest()}", message.to_json()) self.acknowledge_message() @@ -90,6 +124,8 @@ def set_cache(self): def init(self): if Envelope is None: raise MissingDependencyError('envelope', '>=2.0.0') + if jinja2 is None: + self.logger.warning("No jinja2 installed. Thus, the templating is deactivated.") self.set_cache() self.key = f"{self._Bot__bot_id}:" @@ -213,7 +249,7 @@ def set_tester(self, force=True): print("\nWhat e-mail should I use?") self.testing_to = input() - def send_mails_to_tester(self, mails): + def send_mails_to_tester(self, mails: List[Mail]): """ These mails are going to tester's address. Then prints out their count. :param mails: list @@ -222,7 +258,7 @@ def send_mails_to_tester(self, mails): count = sum([1 for mail in mails if self.build_mail(mail, send=True, override_to=self.testing_to)]) print(f"{count}× mail sent to: {self.testing_to}\n") - def prepare_mails(self): + def prepare_mails(self) -> Iterable[Mail]: """ Generates Mail objects """ for mail_record in self.cache.redis.keys(f"{self.key}*")[slice(self.limit_results)]: @@ -254,7 +290,11 @@ def prepare_mails(self): # TODO: worthy to generate on the fly https://github.com/certtools/intelmq/pull/2253#discussion_r1172779620 fieldnames = set() rows_output = [] + src_abuse_contact = None for row in lines: + # obtain this field only once as it is the same for all lines here + if not src_abuse_contact: + src_abuse_contact = row["source.abuse_contact"] try: if threshold and row["time.observation"][:19] < threshold.isoformat()[:19]: continue @@ -283,31 +323,45 @@ def prepare_mails(self): dict_writer.writerow(dict(zip(ordered_fieldnames, ordered_fieldnames))) dict_writer.writerows(rows_output) - email_to = str(mail_record[len(self.key):], encoding="utf-8") count = len(rows_output) - if not count: - path = None - else: - filename = f'{time.strftime("%y%m%d")}_{count}_events' - path = NamedTemporaryFile().name - - with zipfile.ZipFile(path, mode='w', compression=zipfile.ZIP_DEFLATED) as zf: - try: - zf.writestr(filename + ".csv", output.getvalue()) - except Exception: - self.logger.error("Error: Cannot zip mail: %r", mail_record) - continue + if not count or count == 0: + # send no mail if no events are present + continue + + # collect all data which must be the same for all events of the + # bucket and thus can be used for templating + template_keys = ['source.abuse_contact'] + # only collect if templating is enabled (save the memory otherwise)+ + if jinja2 and self.templating and any(self.templating.values()): + template_keys.extend(self.additional_grouping_keys) + + template_data = { + k.replace(".", "_"): lines[0][k] + for k in template_keys + if k in lines[0] + } + + email_to = template_data["source_abuse_contact"] + filename = f'{time.strftime("%y%m%d")}_{count}_events' + path = NamedTemporaryFile().name + + with zipfile.ZipFile(path, mode='w', compression=zipfile.ZIP_DEFLATED) as zf: + try: + zf.writestr(filename + ".csv", output.getvalue()) + except Exception: + self.logger.error("Error: Cannot zip mail: %r", mail_record) + continue - if email_to in self.alternative_mail: - print(f"Alternative: instead of {email_to} we use {self.alternative_mail[email_to]}") - email_to = self.alternative_mail[email_to] + if email_to in self.alternative_mail: + print(f"Alternative: instead of {email_to} we use {self.alternative_mail[email_to]}") + email_to = self.alternative_mail[email_to] - mail = Mail(mail_record, email_to, path, count) + mail = Mail(mail_record, email_to, path, count, template_data) + # build_mail only used to output metadata of the mail -> send=False -> return None self.build_mail(mail, send=False) - if count: - yield mail + yield mail - def build_mail(self, mail, send=False, override_to=None): + def build_mail(self, mail: Mail, send=False, override_to=None): """ creates a MIME message :param mail: Mail object :param send: True to send through SMTP, False for just printing the information @@ -322,15 +376,32 @@ def build_mail(self, mail, send=False, override_to=None): intended_to = None email_to = mail.to email_from = self.email_from + + template_data = mail.template_data + text = self.mail_contents - try: - subject = time.strftime(self.subject) - except ValueError: - subject = self.subject - try: - attachment_name = time.strftime(self.attachment_name) - except ValueError: - attachment_name = self.attachment_name + if jinja2 and self.templating and self.templating.get('body', False): + jinja_tmpl = jinja_env.from_string(text) + text = jinja_tmpl.render(current_time=datetime.datetime.now(), **template_data) + + if jinja2 and self.templating and self.templating.get('subject', False): + jinja_tmpl = jinja_env.from_string(self.subject) + subject = jinja_tmpl.render(current_time=datetime.datetime.now(), **template_data) + else: + try: + subject = time.strftime(self.subject) + except ValueError: + subject = self.subject + + if jinja2 and self.templating and self.templating.get('attachment', False): + jinja_tmpl = jinja_env.from_string(self.attachment_name) + attachment_name = jinja_tmpl.render(current_time=datetime.datetime.now(), **template_data) + else: + try: + attachment_name = time.strftime(self.attachment_name) + except ValueError: + attachment_name = self.attachment_name + if intended_to: subject += f" (intended for {intended_to})" else: diff --git a/intelmq/tests/bots/outputs/smtp_batch/test_output.py b/intelmq/tests/bots/outputs/smtp_batch/test_output.py index 36b3daa0d..f01790dc6 100644 --- a/intelmq/tests/bots/outputs/smtp_batch/test_output.py +++ b/intelmq/tests/bots/outputs/smtp_batch/test_output.py @@ -11,19 +11,28 @@ import intelmq.lib.test as test from intelmq.bots.outputs.smtp_batch.output import SMTPBatchOutputBot -from intelmq.lib.cache import Cache from intelmq.lib.exceptions import MissingDependencyError +from hashlib import sha256 + + +############## +# BASIC TEST # +############## BOT_ID = "test-bot" + IDENTITY1 = 'one@example.com' -KEY1 = f"{BOT_ID}:{IDENTITY1}".encode() +IDENTITY1_HASH = sha256(sha256(IDENTITY1.encode()).digest()).hexdigest() +KEY1 = f"{BOT_ID}:{IDENTITY1_HASH}".encode() EVENT1 = {'__type': 'Event', 'source.ip': '127.0.0.1', 'source.url': 'http://example.com/', 'source.abuse_contact': IDENTITY1 } + IDENTITY2 = 'one@example2.com' -KEY2 = f"{BOT_ID}:{IDENTITY2}".encode() +IDENTITY2_HASH = sha256(sha256(IDENTITY2.encode()).digest()).hexdigest() +KEY2 = f"{BOT_ID}:{IDENTITY2_HASH}".encode() EVENT2 = {'__type': 'Event', 'source.ip': '127.0.0.2', 'source.url': 'http://example2.com/', @@ -34,6 +43,49 @@ FROM_IDENTITY = "from-example@example.com" +################# +# ADVANCED TEST # +################# + +def calc_key(identity, tmpl_data): + h = sha256() + h.update(sha256(identity.encode()).digest()) + h.update(sha256(tmpl_data.encode()).digest()) + return h.hexdigest() + +TESTB_BOT_ID = "test-bot" + +TESTB_IDENTITY1 = 'one@example.com' +TESTB_KEY1 = f"{TESTB_BOT_ID}:{calc_key(TESTB_IDENTITY1, 'valueA')}".encode() +TESTB_EVENT1 = {'__type': 'Event', + 'source.ip': '127.0.0.1', + 'source.url': 'http://example.com/', + 'extra.template_value': 'valueA', + 'source.abuse_contact': TESTB_IDENTITY1 + } + +TESTB_IDENTITY1B = 'one@example.com' +TESTB_KEY1B = f"{TESTB_BOT_ID}:{calc_key(TESTB_IDENTITY1B, 'valueB')}".encode() +TESTB_EVENT1B = {'__type': 'Event', + 'source.ip': '127.0.0.1', + 'source.url': 'http://example.com/', + 'extra.template_value': 'valueB', + 'source.abuse_contact': TESTB_IDENTITY1B + } + +TESTB_IDENTITY2 = 'one@example2.com' +TESTB_KEY2 = f"{TESTB_BOT_ID}:{calc_key(TESTB_IDENTITY2, 'valueC')}".encode() +TESTB_EVENT2 = {'__type': 'Event', + 'source.ip': '127.0.0.2', + 'source.url': 'http://example2.com/', + 'extra.template_value': 'valueC', + 'source.abuse_contact': TESTB_IDENTITY2 + } + +TESTB_MAIL_TEMPLATE = Path(__file__).parent / "mail_template.txt" +TESTB_FROM_IDENTITY = "from-example@example.com" + + @test.skip_exotic() @test.skip_redis() class TestSMTPBatchOutputBot(test.BotTestCase, unittest.TestCase): @@ -114,5 +166,89 @@ def test_processing(self): self.assertCountEqual([], redis.keys(f"{self.bot.key}*")) +@test.skip_exotic() +@test.skip_redis() +class TestSMTPBatchTemplatedOutputBot(test.BotTestCase, unittest.TestCase): + + def setUp(self): + self.sent_messages = [] # here we collect e-mail messages there were to be sent + + @classmethod + def set_bot(cls): + cls.bot_reference = SMTPBatchOutputBot + cls.use_cache = True + cls.sysconfig = {"alternative_mails": "", + "attachment_name": "events_%Y-%m-%d", + "bcc": [], + "email_from": TESTB_FROM_IDENTITY, + "gpg_key": "", + "gpg_pass": "", + "mail_template": str(TESTB_MAIL_TEMPLATE.resolve()), + "ignore_older_than_days": 0, + "limit_results": 10, + "smtp_server": "localhost", + "subject": "Testing subject {{ extra_template_value }} -- {{ current_time.strftime('%Y-%m-%d') }}", + "additional_grouping_keys": ['extra.template_value'], + "templating": {'subject': True}, + "testing_to": "" + } + + if not Envelope: + raise MissingDependencyError('envelope', '>=2.0.0') + + def compare_envelope(self, envelope: Envelope, subject, message, from_, to): + self.assertEqual(subject, envelope.subject()) + self.assertEqual(message, envelope.message()) + self.assertEqual(from_, envelope.from_()) + self.assertEqual(to, envelope.to()) + + def send_message(self): + def _(envelope): + self.sent_messages.append(envelope) + return True # let's pretend the message sending succeeded + return _ + + def test_processing(self): + redis = self.cache + time_string = time.strftime("%Y-%m-%d") + message = TESTB_MAIL_TEMPLATE.read_text() + self.input_message = (TESTB_EVENT1, TESTB_EVENT1B, TESTB_EVENT1B, TESTB_EVENT1, TESTB_EVENT2, TESTB_EVENT1) + + # if tests failed before, there might be left records from the last time + [redis.delete(k) for k in (TESTB_KEY1, TESTB_KEY1B, TESTB_KEY2)] + + # process messages + self.run_bot(iterations=6) + + # event should be in the DB + self.assertCountEqual([TESTB_KEY1, TESTB_KEY1B, TESTB_KEY2], redis.keys(f"{self.bot.key}*")) + self.assertEqual(3, len(redis.lrange(TESTB_KEY1, 0, -1))) + self.assertEqual(2, len(redis.lrange(TESTB_KEY1B, 0, -1))) + self.assertEqual(1, len(redis.lrange(TESTB_KEY2, 0, -1))) + + # run the CLI interface with the --send parameter, it should send the messages and exit + with unittest.mock.patch('envelope.Envelope.send', new=self.send_message()): + self.bot.send = True + self.assertRaises(SystemExit, self.bot.cli_run) + + # bring the messages in a consistent order to make the comparison/assertion easier + msg1, msg1b, msg2 = sorted(self.sent_messages, key=lambda x: x.to()[0]+x.subject()) + # msg1, msg1b, msg2, TESTB_IDENTITY1, TESTB_IDENTITY1b, TESTB_IDENTITY2 + + self.compare_envelope( + msg1, f"Testing subject valueA -- {time_string} ({TESTB_IDENTITY1})", message, TESTB_FROM_IDENTITY, [TESTB_IDENTITY1]) + self.compare_envelope( + msg1b, f"Testing subject valueB -- {time_string} ({TESTB_IDENTITY1B})", message, TESTB_FROM_IDENTITY, [TESTB_IDENTITY1B]) + self.compare_envelope( + msg2, f"Testing subject valueC -- {time_string} ({TESTB_IDENTITY2})", message, TESTB_FROM_IDENTITY, [TESTB_IDENTITY2]) + + # we expect this ZIP attachment + self.assertTrue(self.sent_messages[1] + .attachments(f"events_{time_string}.zip")) + + # messages should have disappeared from the redis + self.assertCountEqual([], redis.keys(f"{self.bot.key}*")) + + if __name__ == '__main__': # pragma: no cover unittest.main()