diff --git a/aikido_firewall/__init__.py b/aikido_firewall/__init__.py index 73b1b3d11..9b6feb4e5 100644 --- a/aikido_firewall/__init__.py +++ b/aikido_firewall/__init__.py @@ -40,5 +40,7 @@ def protect(module="any", server=True): import aikido_firewall.sinks.mysqlclient import aikido_firewall.sinks.pymongo import aikido_firewall.sinks.psycopg2 + import aikido_firewall.sinks.http_client + import aikido_firewall.sinks.socket logger.info("Aikido python firewall started") diff --git a/aikido_firewall/errors/__init__.py b/aikido_firewall/errors/__init__.py index 9350dc88b..a8f211500 100644 --- a/aikido_firewall/errors/__init__.py +++ b/aikido_firewall/errors/__init__.py @@ -21,3 +21,7 @@ class AikidoRateLimiting(AikidoException): def __init__(self, message="You are rate limited by Aikido firewall."): super().__init__(message) self.message = message + + +class AikidoSSRF(AikidoException): + """Exception because of SSRF""" diff --git a/aikido_firewall/helpers/get_port_from_url.py b/aikido_firewall/helpers/get_port_from_url.py new file mode 100644 index 000000000..4ca379e14 --- /dev/null +++ b/aikido_firewall/helpers/get_port_from_url.py @@ -0,0 +1,22 @@ +""" +Helper function file, see function docstring +""" + +from urllib.parse import urlparse + + +def get_port_from_url(url): + """ + Tries to retrieve a port number from the given url + """ + parsed_url = urlparse(url) + + # Check if the port is specified and is a valid integer + if parsed_url.port is not None: + return parsed_url.port + + # Determine the default port based on the protocol + if parsed_url.scheme == "https": + return 443 + elif parsed_url.scheme == "http": + return 80 diff --git a/aikido_firewall/helpers/get_port_from_url_test.py b/aikido_firewall/helpers/get_port_from_url_test.py new file mode 100644 index 000000000..e8c8cbbd6 --- /dev/null +++ b/aikido_firewall/helpers/get_port_from_url_test.py @@ -0,0 +1,9 @@ +import pytest +from .get_port_from_url import get_port_from_url + + +def test_get_port_from_url(): + assert get_port_from_url("http://localhost:4000") == 4000 + assert get_port_from_url("http://localhost") == 80 + assert get_port_from_url("https://localhost") == 443 + assert get_port_from_url("ftp://localhost") is None diff --git a/aikido_firewall/sinks/http_client.py b/aikido_firewall/sinks/http_client.py new file mode 100644 index 000000000..bbcbab2cd --- /dev/null +++ b/aikido_firewall/sinks/http_client.py @@ -0,0 +1,28 @@ +""" +Sink module for `http` +""" + +import copy +import importhook +from aikido_firewall.helpers.logging import logger + + +@importhook.on_import("http.client") +def on_http_import(http): + """ + Hook 'n wrap on `http.client.HTTPConnection.putrequest` + Our goal is to wrap the putrequest() function of the HTTPConnection class : + https://github.com/python/cpython/blob/372df1950817dfcf8b9bac099448934bf8657cf5/Lib/http/client.py#L1136 + Returns : Modified http.client object + """ + modified_http = importhook.copy_module(http) + former_putrequest = copy.deepcopy(http.HTTPConnection.putrequest) + + def aik_new_putrequest(_self, method, url, *args, **kwargs): + logger.info("HTTP Request [%s] %s:%s %s", method, _self.host, _self.port, url) + return former_putrequest(_self, method, url, *args, **kwargs) + + # pylint: disable=no-member + setattr(http.HTTPConnection, "putrequest", aik_new_putrequest) + logger.debug("Wrapped `http` module") + return modified_http diff --git a/aikido_firewall/sinks/socket.py b/aikido_firewall/sinks/socket.py new file mode 100644 index 000000000..d12a3cf08 --- /dev/null +++ b/aikido_firewall/sinks/socket.py @@ -0,0 +1,53 @@ +""" +Sink module for `socket` +""" + +import copy +import importhook +from aikido_firewall.helpers.logging import logger +from aikido_firewall.vulnerabilities.ssrf.inspect_getaddrinfo_result import ( + inspect_getaddrinfo_result, +) + +SOCKET_OPERATIONS = [ + "gethostbyname", + "gethostbyaddr", + "getaddrinfo", + "create_connection", +] + + +def generate_aikido_function(former_func, op): + """ + Generates a new aikido function given a former function and op + """ + + def aik_new_func(*args, **kwargs): + logger.info("socket.%s() Hostname : `%s`;", op, args[0]) + res = former_func(*args, **kwargs) + if op is "getaddrinfo": + inspect_getaddrinfo_result(dns_results=res, hostname=args[0], port=args[1]) + logger.debug("Res %s", res) + return res + + return aik_new_func + + +@importhook.on_import("socket") +def on_socket_import(socket): + """ + Hook 'n wrap on `socket` + Our goal is to wrap the following socket functions that take a hostname : + - gethostbyname() -- map a hostname to its IP number + - gethostbyaddr() -- map an IP number or hostname to DNS info + https://github.com/python/cpython/blob/8f19be47b6a50059924e1d7b64277ad3cef4dac7/Lib/socket.py#L10 + Returns : Modified http.client object + """ + modified_socket = importhook.copy_module(socket) + for op in SOCKET_OPERATIONS: + former_func = copy.deepcopy(getattr(socket, op)) + setattr(modified_socket, op, generate_aikido_function(former_func, op)) + setattr(socket, op, generate_aikido_function(former_func, op)) + + logger.debug("Wrapped `http` module") + return modified_socket diff --git a/aikido_firewall/sources/flask.py b/aikido_firewall/sources/flask.py index d13596365..31863281e 100644 --- a/aikido_firewall/sources/flask.py +++ b/aikido_firewall/sources/flask.py @@ -28,7 +28,7 @@ def dispatch(self, request, call_next): context.set_as_current_context() response = call_next(request) - comms = get_comms() # get IPC facilitator + comms = get_comms() # get IPC facilitator is_curr_route_useful = is_useful_route( response._status_code, context.route, context.method diff --git a/aikido_firewall/vulnerabilities/ssrf/__init__.py b/aikido_firewall/vulnerabilities/ssrf/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/aikido_firewall/vulnerabilities/ssrf/check_context_for_ssrf.py b/aikido_firewall/vulnerabilities/ssrf/check_context_for_ssrf.py new file mode 100644 index 000000000..0e5e03bd2 --- /dev/null +++ b/aikido_firewall/vulnerabilities/ssrf/check_context_for_ssrf.py @@ -0,0 +1,31 @@ +"""Exports check_context_for_ssrf""" + +from aikido_firewall.helpers.extract_strings_from_user_input import ( + extract_strings_from_user_input, +) +from aikido_firewall.helpers.logging import logger +from aikido_firewall.context import UINPUT_SOURCES as SOURCES +from .find_hostname_in_userinput import find_hostname_in_userinput +from .contains_private_ip_address import contains_private_ip_address + + +def check_context_for_ssrf(hostname, port, operation, context): + """ + This will check the context for SSRF + """ + for source in SOURCES: + logger.debug("Checking source %s for SSRF", source) + if hasattr(context, source): + user_inputs = extract_strings_from_user_input(getattr(context, source)) + for user_input, path in user_inputs.items(): + found = find_hostname_in_userinput(user_input, hostname, port) + if found and contains_private_ip_address(hostname): + return { + "operation": operation, + "kind": "ssrf", + "source": source, + "pathToPayload": path, + "metadata": {}, + "payload": user_input, + } + return {} diff --git a/aikido_firewall/vulnerabilities/ssrf/contains_private_ip_address.py b/aikido_firewall/vulnerabilities/ssrf/contains_private_ip_address.py new file mode 100644 index 000000000..a62caf016 --- /dev/null +++ b/aikido_firewall/vulnerabilities/ssrf/contains_private_ip_address.py @@ -0,0 +1,25 @@ +"""exports contains_private_ip_address""" + +from aikido_firewall.helpers.try_parse_url import try_parse_url +from .is_private_ip import is_private_ip + + +def contains_private_ip_address(hostname): + """ + Checks if the hostname contains an IP that's private + """ + if hostname == "localhost": + return True + + # Attempt to parse the URL + url = try_parse_url(f"http://{hostname}") + if url is None: + return False + + # Check for IPv6 addresses enclosed in square brackets + if url.hostname.startswith("[") and url.hostname.endswith("]"): + ipv6 = url.hostname[1:-1] # Extract the IPv6 address + if is_private_ip(ipv6): + return True + + return is_private_ip(url.hostname) diff --git a/aikido_firewall/vulnerabilities/ssrf/contains_private_ip_address_test.py b/aikido_firewall/vulnerabilities/ssrf/contains_private_ip_address_test.py new file mode 100644 index 000000000..ca4fb197d --- /dev/null +++ b/aikido_firewall/vulnerabilities/ssrf/contains_private_ip_address_test.py @@ -0,0 +1,182 @@ +import pytest +from .contains_private_ip_address import contains_private_ip_address + + +public_ips = [ + "44.37.112.180", + "46.192.247.73", + "71.12.102.112", + "101.0.26.90", + "111.211.73.40", + "156.238.194.84", + "164.101.185.82", + "223.231.138.242", + "::1fff:0.0.0.0", + "::1fff:10.0.0.0", + "::1fff:0:0.0.0.0", + "::1fff:0:10.0.0.0", + "2001:2:ffff:ffff:ffff:ffff:ffff:ffff", + "64:ff9a::0.0.0.0", + "64:ff9a::255.255.255.255", + "99::", + "99::ffff:ffff:ffff:ffff", + "101::", + "101::ffff:ffff:ffff:ffff", + "2000::", + "2000::ffff:ffff:ffff:ffff:ffff:ffff", + "2001:10::", + "2001:1f:ffff:ffff:ffff:ffff:ffff:ffff", + "2001:db7::", + "2001:db7:ffff:ffff:ffff:ffff:ffff:ffff", + "2001:db9::", + "fb00::", + "fbff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", + "fec0::", +] + +private_ips = [ + "0.0.0.0", + "0000.0000.0000.0000", + "0.0.0.1", + "0.0.0.7", + "0.0.0.255", + "0.0.255.255", + "0.1.255.255", + "0.15.255.255", + "0.63.255.255", + "0.255.255.254", + "0.255.255.255", + "10.0.0.0", + "10.0.0.1", + "10.0.0.01", + "10.0.0.001", + "10.255.255.254", + "10.255.255.255", + "100.64.0.0", + "100.64.0.1", + "100.127.255.254", + "100.127.255.255", + "127.0.0.0", + "127.0.0.1", + "127.0.0.01", + "127.000.000.1", + "127.255.255.254", + "127.255.255.255", + "169.254.0.0", + "169.254.0.1", + "169.254.255.254", + "169.254.255.255", + "172.16.0.0", + "172.16.0.1", + "172.16.0.001", + "172.31.255.254", + "172.31.255.255", + "192.0.0.0", + "192.0.0.1", + "192.0.0.6", + "192.0.0.7", + "192.0.0.8", + "192.0.0.9", + "192.0.0.10", + "192.0.0.11", + "192.0.0.170", + "192.0.0.171", + "192.0.0.254", + "192.0.0.255", + "192.0.2.0", + "192.0.2.1", + "192.0.2.254", + "192.0.2.255", + "192.31.196.0", + "192.31.196.1", + "192.31.196.254", + "192.31.196.255", + "192.52.193.0", + "192.52.193.1", + "192.52.193.254", + "192.52.193.255", + "192.88.99.0", + "192.88.99.1", + "192.88.99.254", + "192.88.99.255", + "192.168.0.0", + "192.168.0.1", + "192.168.255.254", + "192.168.255.255", + "192.175.48.0", + "192.175.48.1", + "192.175.48.254", + "192.175.48.255", + "198.18.0.0", + "198.18.0.1", + "198.19.255.254", + "198.19.255.255", + "198.51.100.0", + "198.51.100.1", + "198.51.100.254", + "198.51.100.255", + "203.0.113.0", + "203.0.113.1", + "203.0.113.254", + "203.0.113.255", + "240.0.0.0", + "240.0.0.1", + "224.0.0.0", + "224.0.0.1", + "255.0.0.0", + "255.192.0.0", + "255.240.0.0", + "255.254.0.0", + "255.255.0.0", + "255.255.255.0", + "255.255.255.248", + "255.255.255.254", + "255.255.255.255", + "0000:0000:0000:0000:0000:0000:0000:0000", + "::", + "::1", + "::ffff:0.0.0.0", + "::ffff:127.0.0.1", + "fe80::", + "fe80::1", + "fe80::abc:1", + "febf:ffff:ffff:ffff:ffff:ffff:ffff:ffff", + "fc00::", + "fc00::1", + "fc00::abc:1", + "fdff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", + "fd00:ec2::254", + "169.254.169.254", +] + +invalid_ips = [ + "100::ffff::", + "::ffff:0.0.255.255.255", + "::ffff:0.255.255.255.255", + "0000.0000", + "127.1", + "127.0.1", + "2130706433", + "0x7f000001", +] + + +def test_public_ips(): + for ip in public_ips: + if ":" in ip: + ip = f"[{ip}]" # IPv6 are enclosed in brackets + assert not contains_private_ip_address(ip), f"Expected {ip} to be public" + + +def test_private_ips(): + for ip in private_ips: + if ":" in ip: + ip = f"[{ip}]" # IPv6 are enclosed in brackets + assert contains_private_ip_address(ip), f"Expected {ip} to be private" + + +def test_invalid_ips(): + for ip in invalid_ips: + if ":" in ip: + ip = f"[{ip}]" # IPv6 are enclosed in brackets + assert not contains_private_ip_address(ip), f"Expected {ip} to be invalid" diff --git a/aikido_firewall/vulnerabilities/ssrf/find_hostname_in_context.py b/aikido_firewall/vulnerabilities/ssrf/find_hostname_in_context.py new file mode 100644 index 000000000..9093ca71c --- /dev/null +++ b/aikido_firewall/vulnerabilities/ssrf/find_hostname_in_context.py @@ -0,0 +1,27 @@ +""" +Mainly exports function `find_hostname_in_context` +""" + +from aikido_firewall.context import UINPUT_SOURCES +from aikido_firewall.helpers.extract_strings_from_user_input import ( + extract_strings_from_user_input, +) +from .find_hostname_in_userinput import find_hostname_in_userinput + + +def find_hostname_in_context(hostname, context, port): + """Tries to locate the given hostname from context""" + for source in UINPUT_SOURCES: + if not hasattr(context, source): + continue + user_inputs = extract_strings_from_user_input(getattr(context, source)) + if not user_inputs: + continue + for user_input, path in user_inputs.items(): + found = find_hostname_in_userinput(user_input, hostname, port) + if found: + return { + "source": source, + "pathToPayload": path, + "payload": user_input, + } diff --git a/aikido_firewall/vulnerabilities/ssrf/find_hostname_in_userinput.py b/aikido_firewall/vulnerabilities/ssrf/find_hostname_in_userinput.py new file mode 100644 index 000000000..caf5ecf85 --- /dev/null +++ b/aikido_firewall/vulnerabilities/ssrf/find_hostname_in_userinput.py @@ -0,0 +1,31 @@ +""" +Only exports find_hostname_in_userinput function +""" + +from aikido_firewall.helpers.get_port_from_url import get_port_from_url +from aikido_firewall.helpers.try_parse_url import try_parse_url + + +def find_hostname_in_userinput(user_input, hostname, port=None): + """ + Returns true if the hostname is in userinput + """ + if len(user_input) <= 1: + return False + + hostname_url = try_parse_url(f"http://{hostname}") + if not hostname_url: + return False + + variants = [user_input, f"http://{user_input}", f"https://{user_input}"] + for variant in variants: + user_input_url = try_parse_url(variant) + if user_input_url and user_input_url.hostname == hostname_url.hostname: + user_port = get_port_from_url(user_input_url.geturl()) + + if port is None: + return True + if port is not None and user_port == port: + return True + + return False diff --git a/aikido_firewall/vulnerabilities/ssrf/find_hostname_in_userinput_test.py b/aikido_firewall/vulnerabilities/ssrf/find_hostname_in_userinput_test.py new file mode 100644 index 000000000..d94514634 --- /dev/null +++ b/aikido_firewall/vulnerabilities/ssrf/find_hostname_in_userinput_test.py @@ -0,0 +1,84 @@ +import pytest +from .find_hostname_in_userinput import find_hostname_in_userinput + + +def test_returns_false_if_user_input_and_hostname_are_empty(): + assert find_hostname_in_userinput("", "") is False + + +def test_returns_false_if_user_input_is_empty(): + assert find_hostname_in_userinput("", "example.com") is False + + +def test_returns_false_if_hostname_is_empty(): + assert find_hostname_in_userinput("http://example.com", "") is False + + +def test_it_parses_hostname_from_user_input(): + assert find_hostname_in_userinput("http://localhost", "localhost") is True + + +def test_it_parses_special_ip(): + assert find_hostname_in_userinput("http://localhost", "localhost") is True + + +def test_it_parses_hostname_from_user_input_with_path_behind_it(): + assert find_hostname_in_userinput("http://localhost/path", "localhost") is True + + +def test_it_does_not_parse_hostname_from_user_input_with_misspelled_protocol(): + assert find_hostname_in_userinput("http:/localhost", "localhost") is False + + +def test_it_does_not_parse_hostname_from_user_input_without_protocol_separator(): + assert find_hostname_in_userinput("http:localhost", "localhost") is False + + +def test_it_does_not_parse_hostname_from_user_input_with_misspelled_protocol_and_path_behind_it(): + assert find_hostname_in_userinput("http:/localhost/path/path", "localhost") is False + + +def test_it_parses_hostname_from_user_input_without_protocol_and_path_behind_it(): + assert find_hostname_in_userinput("localhost/path/path", "localhost") is True + + +def test_it_flags_ftp_as_protocol(): + assert find_hostname_in_userinput("ftp://localhost", "localhost") is True + + +def test_it_parses_hostname_from_user_input_without_protocol(): + assert find_hostname_in_userinput("localhost", "localhost") is True + + +def test_it_ignores_invalid_urls(): + assert find_hostname_in_userinput("http://", "localhost") is False + + +def test_user_input_is_smaller_than_hostname(): + assert find_hostname_in_userinput("localhost", "localhost localhost") is False + + +def test_it_finds_ip_address_inside_url(): + assert ( + find_hostname_in_userinput( + "http://169.254.169.254/latest/meta-data/", "169.254.169.254" + ) + is True + ) + + +def test_it_finds_ip_address_with_strange_notation_inside_url(): + assert find_hostname_in_userinput("http://2130706433", "2130706433") is True + assert find_hostname_in_userinput("http://127.1", "127.1") is True + assert find_hostname_in_userinput("http://127.0.1", "127.0.1") is True + + +def test_it_works_with_ports(): + assert find_hostname_in_userinput("http://localhost", "localhost", 8080) is False + assert ( + find_hostname_in_userinput("http://localhost:8080", "localhost", 8080) is True + ) + assert find_hostname_in_userinput("http://localhost:8080", "localhost") is True + assert ( + find_hostname_in_userinput("http://localhost:8080", "localhost", 4321) is False + ) diff --git a/aikido_firewall/vulnerabilities/ssrf/imds.py b/aikido_firewall/vulnerabilities/ssrf/imds.py new file mode 100644 index 000000000..ceafc80c9 --- /dev/null +++ b/aikido_firewall/vulnerabilities/ssrf/imds.py @@ -0,0 +1,46 @@ +""" +imds.py file, exports : +is_imds_ip_address, is_trusted_hostname +""" + + +class BlockList: + """A list of IP's that shouldn't be accessed""" + + def __init__(self): + self.blocked_addresses = {"ipv4": set(), "ipv6": set()} + + def add_address(self, address, address_type): + """Add an address to this list""" + if address_type in self.blocked_addresses: + self.blocked_addresses[address_type].add(address) + + def check(self, address, address_type=None): + """Check if the IP is on the list""" + if address_type: + return address in self.blocked_addresses.get(address_type, set()) + return any( + address in addresses for addresses in self.blocked_addresses.values() + ) + + +# Create an instance of BlockList +imds_addresses = BlockList() + +# Block the IP addresses used by AWS EC2 instances for IMDS +imds_addresses.add_address("169.254.169.254", "ipv4") +imds_addresses.add_address("fd00:ec2::254", "ipv6") + + +def is_imds_ip_address(ip): + """Checks if the IP is an imds ip""" + return imds_addresses.check(ip) or imds_addresses.check(ip, "ipv6") + + +# Trusted hostnames for Google Cloud +trusted_hosts = ["metadata.google.internal", "metadata.goog"] + + +def is_trusted_hostname(hostname): + """Checks if this hostname is trusted""" + return hostname in trusted_hosts diff --git a/aikido_firewall/vulnerabilities/ssrf/imds_test.py b/aikido_firewall/vulnerabilities/ssrf/imds_test.py new file mode 100644 index 000000000..780a80326 --- /dev/null +++ b/aikido_firewall/vulnerabilities/ssrf/imds_test.py @@ -0,0 +1,19 @@ +import pytest +from .imds import is_imds_ip_address + + +# Assuming the is_imds_ip_address function is defined in the same file or imported from another module +def is_imds_ip_address(ip: str) -> bool: + # This is a placeholder for the actual implementation + # You should replace this with the actual function from your code + return ip in ["169.254.169.254", "fd00:ec2::254"] + + +def test_returns_true_for_imds_ip_addresses(): + assert is_imds_ip_address("169.254.169.254") is True + assert is_imds_ip_address("fd00:ec2::254") is True + + +def test_returns_false_for_non_imds_ip_addresses(): + assert is_imds_ip_address("1.2.3.4") is False + assert is_imds_ip_address("example.com") is False diff --git a/aikido_firewall/vulnerabilities/ssrf/inspect_getaddrinfo_result.py b/aikido_firewall/vulnerabilities/ssrf/inspect_getaddrinfo_result.py new file mode 100644 index 000000000..e6da6055b --- /dev/null +++ b/aikido_firewall/vulnerabilities/ssrf/inspect_getaddrinfo_result.py @@ -0,0 +1,88 @@ +""" +Mainly exports inspect_getaddrinfo_result function +""" + +import traceback +from aikido_firewall.helpers.try_parse_url import try_parse_url +from aikido_firewall.context import get_current_context +from aikido_firewall.helpers.logging import logger +from aikido_firewall.background_process import get_comms +from aikido_firewall.errors import AikidoSSRF +from .imds import is_trusted_hostname, is_imds_ip_address +from .is_private_ip import is_private_ip +from .find_hostname_in_context import find_hostname_in_context + +# gets called when the result of the DNS resolution has come in +def inspect_getaddrinfo_result(dns_results, hostname, port): + """Inspect the results of a getaddrinfo() call""" + if not hostname or try_parse_url(hostname) is not None: + # If the hostname is an IP address, we don't need to inspect it + logger.debug("Hostname %s is actually an IP address, ignoring", hostname) + return + + context = get_current_context() + + should_block_res = get_comms().send_data_to_bg_process( + action="READ_PROPERTY", obj="block", receive=True + ) + should_block = should_block_res["success"] and should_block_res["data"] + + ip_addresses = extract_ip_array_from_results(dns_results) + if resolves_to_imds_ip(ip_addresses, hostname): + # Block stored SSRF attack that target IMDS IP addresses + # An attacker could have stored a hostname in a database that points to an IMDS IP address + # We don't check if the user input contains the hostname because there's no context + if should_block: + raise AikidoSSRF() + + if not context: + return + + private_ip = next((ip for ip in ip_addresses if is_private_ip(ip)), None) + if not private_ip: + return + + found = find_hostname_in_context(hostname, context, port) + if not found: + return + + stack = " ".join(traceback.format_stack()) + attack = { + "module": "socket", + "operation": "socket.getaddrinfo", + "kind": "ssrf", + "source": found["source"], + "blocked": should_block, + "stack": stack, + "path": found["pathToPayload"], + "metadata": {"hostname": hostname}, + "payload": found["payload"], + } + logger.debug("Attack results : %s", attack) + + logger.debug("Sending data to bg process :") + get_comms().send_data_to_bg_process("ATTACK", (attack, context)) + + if should_block: + raise AikidoSSRF() + + +def resolves_to_imds_ip(resolved_ip_addresses, hostname): + """ + returns a boolean, true if the IP is an imds ip + """ + # Allow access to Google Cloud metadata service as you need to set specific headers to access it + # We don't want to block legitimate requests + if is_trusted_hostname(hostname): + return False + return any(is_imds_ip_address(ip) for ip in resolved_ip_addresses) + + +def extract_ip_array_from_results(dns_results): + """Extracts IP's from dns results""" + ip_addresses = [] + for result in dns_results: + ip_object = result[4] + if ip_object is not None and ip_object[0] is not None: + ip_addresses.append(ip_object[0]) + return ip_addresses diff --git a/aikido_firewall/vulnerabilities/ssrf/is_private_ip.py b/aikido_firewall/vulnerabilities/ssrf/is_private_ip.py new file mode 100644 index 000000000..d02506253 --- /dev/null +++ b/aikido_firewall/vulnerabilities/ssrf/is_private_ip.py @@ -0,0 +1,71 @@ +"""Only exports is_private_ip function""" + +import ipaddress + +# Define private IP ranges +PRIVATE_IP_RANGES = [ + "0.0.0.0/8", + "10.0.0.0/8", + "100.64.0.0/10", + "127.0.0.0/8", + "169.254.0.0/16", + "172.16.0.0/12", + "192.0.0.0/24", + "192.0.2.0/24", + "192.31.196.0/24", + "192.52.193.0/24", + "192.88.99.0/24", + "192.168.0.0/16", + "192.175.48.0/24", + "198.18.0.0/15", + "198.51.100.0/24", + "203.0.113.0/24", + "240.0.0.0/4", + "224.0.0.0/4", + "255.255.255.255/32", +] + +PRIVATE_IPV6_RANGES = [ + "::/128", # Unspecified address + "::1/128", # Loopback address + "fc00::/7", # Unique local address (ULA) + "fe80::/10", # Link-local address (LLA) + "::ffff:127.0.0.1/128", # IPv4-mapped address +] + +# Create a set to hold private IP networks +private_ip_networks = set() + +# Add private IPv4 ranges to the set +for ip_range in PRIVATE_IP_RANGES: + private_ip_networks.add(ipaddress.ip_network(ip_range)) + +# Add private IPv6 ranges to the set +for ip_range in PRIVATE_IPV6_RANGES: + private_ip_networks.add(ipaddress.ip_network(ip_range)) + + +def normalize_ip(ip): + """Normalize the IP address by removing leading zeros.""" + if not ":" in ip: + # Normalize IPv4 ip's + parts = ip.split(".") + normalized_parts = [ + str(int(part)) for part in parts + ] # Convert to int and back to str to remove leading zeros + return ".".join(normalized_parts) + return ip + + +def is_private_ip(ip): + """Returns true if the ip entered is private""" + try: + normalized_ip = normalize_ip(ip) + ip_obj = ipaddress.ip_address(normalized_ip) + if isinstance(ip_obj, ipaddress.IPv6Address) and ip_obj.ipv4_mapped: + return any(ip_obj.ipv4_mapped in network for network in private_ip_networks) + + # Check if the IP address is in any of the private networks + return any(ip_obj in network for network in private_ip_networks) + except ValueError: + return False # Return False if the IP address is invalid diff --git a/aikido_firewall/vulnerabilities/ssrf/is_private_ip_test.py b/aikido_firewall/vulnerabilities/ssrf/is_private_ip_test.py new file mode 100644 index 000000000..05b7c84f6 --- /dev/null +++ b/aikido_firewall/vulnerabilities/ssrf/is_private_ip_test.py @@ -0,0 +1,34 @@ +import pytest +from .is_private_ip import is_private_ip + + +# Test cases for is_private_ip +def test_private_ipv4_addresses(): + assert is_private_ip("192.168.1.1") is True + assert is_private_ip("10.0.0.1") is True + assert is_private_ip("172.16.0.1") is True + assert is_private_ip("127.0.0.1") is True + assert is_private_ip("169.254.1.1") is True + + +def test_public_ipv4_addresses(): + assert is_private_ip("8.8.8.8") is False + assert is_private_ip("172.15.0.1") is False + + +def test_private_ipv6_addresses(): + assert is_private_ip("::1") is True # Loopback address + assert is_private_ip("fc00::1") is True # Unique local address + assert is_private_ip("fe80::1") is True # Link-local address + + +def test_public_ipv6_addresses(): + assert is_private_ip("2001:db8::1") is False # Documentation address + assert is_private_ip("::ffff:8.8.8.8") is False # IPv4-mapped address + + +def test_invalid_addresses(): + assert is_private_ip("invalid-ip") is False + assert is_private_ip("") is False + assert is_private_ip("256.256.256.256") is False # Invalid IPv4 + assert is_private_ip("::g") is False # Invalid IPv6 diff --git a/sample-apps/flask-mysql/app.py b/sample-apps/flask-mysql/app.py index b10c8541f..76aa4237f 100644 --- a/sample-apps/flask-mysql/app.py +++ b/sample-apps/flask-mysql/app.py @@ -3,6 +3,7 @@ from flask import Flask, render_template, request from flaskext.mysql import MySQL +import requests app = Flask(__name__) if __name__ == '__main__': @@ -42,3 +43,13 @@ def create_dog(): cursor.execute(f'INSERT INTO dogs (dog_name, isAdmin) VALUES ("%s", 0)' % (dog_name)) connection.commit() return f'Dog {dog_name} created successfully' + +@app.route("/request", methods=['GET']) +def show_request_page(): + return render_template('request.html') + +@app.route("/request", methods=['POST']) +def make_request(): + url = request.form['url'] + res = requests.get(url) + return str(res) diff --git a/sample-apps/flask-mysql/templates/request.html b/sample-apps/flask-mysql/templates/request.html new file mode 100644 index 000000000..cffcd9af1 --- /dev/null +++ b/sample-apps/flask-mysql/templates/request.html @@ -0,0 +1,17 @@ + + +
+ + + +