|
| 1 | +# SPDX-FileCopyrightText: 2018 Karel |
| 2 | +# |
| 3 | +# SPDX-License-Identifier: AGPL-3.0-or-later |
| 4 | + |
| 5 | +r""" |
| 6 | +
|
| 7 | +Parser of text intended to obtain IOCs from tweets. |
| 8 | +First substitutions are performed and then words in the text are compared with |
| 9 | +'(/|^)([a-z0-9.-]+\.[a-z0-9]+?)([/:]|$)' |
| 10 | +In the case of a match it is checked whether this can be a valid domain using get_tld |
| 11 | +There is also a whitelist for filtering out good domains. |
| 12 | +
|
| 13 | +
|
| 14 | +Parameters: |
| 15 | + domain_whitelist : domains that will be ignored in parsing |
| 16 | +
|
| 17 | + substitutions : semicolon separated list of pairs |
| 18 | + substitutions that will be made in the text, |
| 19 | + for example " .com,.com" enables parsing of one fuzzy format "[.];." enables the parsing of another fuzzy format |
| 20 | +
|
| 21 | + classification_type : string with a valid classificationtype |
| 22 | +""" |
| 23 | +import re |
| 24 | +import pkg_resources |
| 25 | + |
| 26 | +from typing import Optional, Iterable |
| 27 | + |
| 28 | +from intelmq.lib.bot import ParserBot, utils |
| 29 | +from intelmq.lib.exceptions import InvalidArgument |
| 30 | +from intelmq.lib.harmonization import ClassificationType |
| 31 | +from intelmq.lib.exceptions import MissingDependencyError |
| 32 | + |
| 33 | +try: |
| 34 | + from url_normalize import url_normalize |
| 35 | +except ImportError: |
| 36 | + url_normalize = None |
| 37 | + |
| 38 | +try: |
| 39 | + import tld.exceptions |
| 40 | + from tld import get_tld |
| 41 | + from tld.utils import update_tld_names |
| 42 | +except ImportError: |
| 43 | + get_tld = None |
| 44 | + update_tld_names = None |
| 45 | + |
| 46 | + |
| 47 | +class IocExtractorParserBot(ParserBot): |
| 48 | + """Parse tweets and extract IoC data. Currently only URLs are supported, a whitelist of safe domains can be provided""" |
| 49 | + default_scheme: Optional[str] = None |
| 50 | + domain_whitelist: str = 't.co' |
| 51 | + _domain_whitelist: Iterable[str] = [] |
| 52 | + substitutions: str = ".net;[.]net" |
| 53 | + _substitutions: Iterable[str] = [] |
| 54 | + classification_type: str = "blacklist" |
| 55 | + |
| 56 | + def init(self): |
| 57 | + if url_normalize is None: |
| 58 | + raise MissingDependencyError("url-normalize") |
| 59 | + url_version = pkg_resources.get_distribution("url-normalize").version |
| 60 | + if tuple(int(v) for v in url_version.split('.')) < (1, 4, 1) and self.default_scheme is not None: |
| 61 | + raise ValueError("Parameter 'default_scheme' given but 'url-normalize' version %r does not support it. " |
| 62 | + "Get at least version '1.4.1'." % url_version) |
| 63 | + if get_tld is None: |
| 64 | + raise MissingDependencyError("tld") |
| 65 | + try: |
| 66 | + update_tld_names() |
| 67 | + except tld.exceptions.TldIOError: |
| 68 | + self.logger.info("Could not update TLD names cache.") |
| 69 | + if self.domain_whitelist != '': |
| 70 | + self._domain_whitelist.extend(self.domain_whitelist.split(',')) |
| 71 | + if self.substitutions != '': |
| 72 | + temp = self.substitutions.split(';') |
| 73 | + if len(temp) % 2 != 0: |
| 74 | + raise InvalidArgument( |
| 75 | + 'substitutions', |
| 76 | + got=self.substitutions, |
| 77 | + expected="even number of ; separated strings") |
| 78 | + for i in range(int(len(temp) / 2)): |
| 79 | + self._substitutions.append([temp[2 * i], temp[2 * i + 1]]) |
| 80 | + if not ClassificationType.is_valid(self.classification_type): |
| 81 | + self.classification_type = 'unknown' |
| 82 | + |
| 83 | + if self.default_scheme is not None: |
| 84 | + self.url_kwargs = {'default_scheme': self.default_scheme} |
| 85 | + else: |
| 86 | + self.url_kwargs = {} |
| 87 | + |
| 88 | + def get_domain(self, address): |
| 89 | + try: |
| 90 | + dom = re.search(r'(//|^)([a-z0-9.-]*[a-z]\.[a-z][a-z-]*?(?:[/:].*|$))', address).group(2) |
| 91 | + if not self.in_whitelist(dom): |
| 92 | + if get_tld(url_normalize(dom, **self.url_kwargs), fail_silently=True): |
| 93 | + return url_normalize(dom, **self.url_kwargs) |
| 94 | + return None |
| 95 | + except AttributeError: |
| 96 | + return None |
| 97 | + except UnicodeError: # url_normalize's error, happens when something weird matches regex |
| 98 | + self.logger.info("Caught UnicodeError on %r.", address) |
| 99 | + return None |
| 100 | + |
| 101 | + def in_whitelist(self, domain: str) -> bool: |
| 102 | + for dom_clean in self._domain_whitelist: |
| 103 | + if re.search(dom_clean, domain): |
| 104 | + return True |
| 105 | + return False |
| 106 | + |
| 107 | + def get_data_from_text(self, text) -> list: |
| 108 | + data = [] |
| 109 | + for sub in self._substitutions: # allows for custom matches |
| 110 | + text = text.replace(sub[0], sub[1]) |
| 111 | + tweet_text = text.split() |
| 112 | + tweet_text = [x.strip().lower() for x in tweet_text] |
| 113 | + for part in tweet_text: |
| 114 | + domain = self.get_domain(part) |
| 115 | + if domain: |
| 116 | + data.append(domain) |
| 117 | + return data |
| 118 | + |
| 119 | + def process(self): |
| 120 | + report = self.receive_message() |
| 121 | + text = utils.base64_decode(report["raw"]) |
| 122 | + data = self.get_data_from_text(text) |
| 123 | + for url in data: |
| 124 | + event = self.new_event(report) |
| 125 | + event.add('raw', text) |
| 126 | + event.add('source.url', url) |
| 127 | + event.add('classification.type', self.classification_type) |
| 128 | + self.send_message(event) |
| 129 | + self.acknowledge_message() |
| 130 | + |
| 131 | + |
| 132 | +BOT = IocExtractorParserBot |
0 commit comments