13
13
from base64 import b64decode
14
14
from collections import OrderedDict
15
15
from io import StringIO
16
+ from hashlib import sha256
16
17
17
18
from redis .exceptions import TimeoutError
18
19
26
27
Envelope = None
27
28
28
29
30
+ def hash_arbitrary (value : Any ) -> bytes :
31
+ value_bytes = None
32
+ if isinstance (value , str ):
33
+ value_bytes = value .encode ("utf-8" )
34
+ elif isinstance (value , int ):
35
+ value_bytes = bytes (value )
36
+ else :
37
+ value_bytes = json .dumps (value , sort_keys = True ).encode ("utf-8" )
38
+ return sha256 (value_bytes ).digest ()
39
+
29
40
@dataclass
30
41
class Mail :
31
42
key : str
@@ -36,6 +47,7 @@ class Mail:
36
47
37
48
class SMTPBatchOutputBot (Bot ):
38
49
# configurable parameters
50
+ additional_grouping_keys : Optional [list ] = [] # refers to the event directly
39
51
alternative_mails : Optional [str ] = None
40
52
bcc : Optional [list ] = None
41
53
email_from : str = ""
@@ -75,7 +87,20 @@ def process(self):
75
87
if "source.abuse_contact" in message :
76
88
field = message ["source.abuse_contact" ]
77
89
for mail in (field if isinstance (field , list ) else [field ]):
78
- self .cache .redis .rpush (f"{ self .key } { mail } " , message .to_json ())
90
+ # - Each event goes into one bucket (equivalent to group-by)
91
+ # - The id of each bucket is calcuated by hashing all the keys that should be grouped for
92
+ # - Hashing ensures the redis-key does not grow indefinitely.
93
+ # - In order to avoid collisions, each value is hashed before
94
+ # appending to the input for the redis-key-hash
95
+ # (could also be solved by special separator which would need
96
+ # to be escaped or prepending the length of the value).
97
+ h = sha256 ()
98
+ h .update (sha256 (mail .encode ("utf-8" )).digest ())
99
+ for i in self .additional_grouping_keys :
100
+ if i not in message :
101
+ continue
102
+ h .update (hash_arbitrary (message [i ]))
103
+ self .cache .redis .rpush (f"{ self .key } { h .hexdigest ()} " , message .to_json ())
79
104
80
105
self .acknowledge_message ()
81
106
@@ -254,7 +279,11 @@ def prepare_mails(self):
254
279
# TODO: worthy to generate on the fly https://github.com/certtools/intelmq/pull/2253#discussion_r1172779620
255
280
fieldnames = set ()
256
281
rows_output = []
282
+ src_abuse_contact = None
257
283
for row in lines :
284
+ # obtain this field only once as it is the same for all lines here
285
+ if not src_abuse_contact :
286
+ src_abuse_contact = row ["source.abuse_contact" ]
258
287
try :
259
288
if threshold and row ["time.observation" ][:19 ] < threshold .isoformat ()[:19 ]:
260
289
continue
@@ -283,29 +312,38 @@ def prepare_mails(self):
283
312
dict_writer .writerow (dict (zip (ordered_fieldnames , ordered_fieldnames )))
284
313
dict_writer .writerows (rows_output )
285
314
286
- email_to = str (mail_record [len (self .key ):], encoding = "utf-8" )
287
315
count = len (rows_output )
288
- if not count :
289
- path = None
290
- else :
291
- filename = f'{ time .strftime ("%y%m%d" )} _{ count } _events'
292
- path = NamedTemporaryFile ().name
293
-
294
- with zipfile .ZipFile (path , mode = 'w' , compression = zipfile .ZIP_DEFLATED ) as zf :
295
- try :
296
- zf .writestr (filename + ".csv" , output .getvalue ())
297
- except Exception :
298
- self .logger .error ("Error: Cannot zip mail: %r" , mail_record )
299
- continue
316
+ if not count or count == 0 :
317
+ # send no mail if no events are present
318
+ continue
319
+
320
+ # collect all data which must be the same for all events of the
321
+ # bucket and thus can be used for templating
322
+ template_data = {
323
+ k .replace ("." , "_" ): lines [0 ][k ]
324
+ for k in ["source.abuse_contact" ] + self .additional_grouping_keys
325
+ if k in rows_output [0 ]
326
+ }
327
+
328
+ email_to = template_data ["source_abuse_contact" ]
329
+ filename = f'{ time .strftime ("%y%m%d" )} _{ count } _events'
330
+ path = NamedTemporaryFile ().name
331
+
332
+ with zipfile .ZipFile (path , mode = 'w' , compression = zipfile .ZIP_DEFLATED ) as zf :
333
+ try :
334
+ zf .writestr (filename + ".csv" , output .getvalue ())
335
+ except Exception :
336
+ self .logger .error ("Error: Cannot zip mail: %r" , mail_record )
337
+ continue
300
338
301
- if email_to in self .alternative_mail :
302
- print (f"Alternative: instead of { email_to } we use { self .alternative_mail [email_to ]} " )
303
- email_to = self .alternative_mail [email_to ]
339
+ if email_to in self .alternative_mail :
340
+ print (f"Alternative: instead of { email_to } we use { self .alternative_mail [email_to ]} " )
341
+ email_to = self .alternative_mail [email_to ]
304
342
305
343
mail = Mail (mail_record , email_to , path , count )
344
+ # build_mail only used to output metadata of the mail -> send=False -> return None
306
345
self .build_mail (mail , send = False )
307
- if count :
308
- yield mail
346
+ yield mail
309
347
310
348
def build_mail (self , mail , send = False , override_to = None ):
311
349
""" creates a MIME message
0 commit comments