From 195d0d07aa32ce78e13a5dc59bcb1d181ed6d3fc Mon Sep 17 00:00:00 2001 From: Wout Feys Date: Wed, 7 Aug 2024 08:15:52 +0200 Subject: [PATCH 1/2] Add get_ip_from_request helper function and the test cases --- .../helpers/get_ip_form_request_test.py | 100 ++++++++++++++++++ .../helpers/get_ip_from_request.py | 69 ++++++++++++ 2 files changed, 169 insertions(+) create mode 100644 aikido_firewall/helpers/get_ip_form_request_test.py create mode 100644 aikido_firewall/helpers/get_ip_from_request.py diff --git a/aikido_firewall/helpers/get_ip_form_request_test.py b/aikido_firewall/helpers/get_ip_form_request_test.py new file mode 100644 index 000000000..9a20b2f8f --- /dev/null +++ b/aikido_firewall/helpers/get_ip_form_request_test.py @@ -0,0 +1,100 @@ +import pytest +from .get_ip_from_request import ( + get_ip_from_request, + is_ip, + get_client_ip_from_x_forwarded_for, +) + + +# Test `get_ip_from_request` function : +def test_get_ip_from_request(): + # Test case 1: Valid X-Forwarded-For header with valid IP + headers = {"X-Forwarded-For": "192.168.1.1, 10.0.0.1"} + assert get_ip_from_request(None, headers) == "192.168.1.1" + + # Test case 2: Valid X-Forwarded-For header with invalid IPs + headers = {"X-Forwarded-For": "256.256.256.256, 192.168.1.1"} + assert ( + get_ip_from_request(None, headers) == "192.168.1.1" + ) # Should return the valid IP + + # Test case 3: Valid remote address + headers = {} + assert get_ip_from_request("10.0.0.1", headers) == "10.0.0.1" + + # Test case 4: Valid remote address with invalid X-Forwarded-For + headers = {"X-Forwarded-For": "abc.def.ghi.jkl, 256.256.256.256"} + assert ( + get_ip_from_request("10.0.0.1", headers) == "10.0.0.1" + ) # Should return the remote address + + # Test case 5: Both X-Forwarded-For and remote address are invalid + headers = {"X-Forwarded-For": "abc.def.ghi.jkl, 256.256.256.256"} + assert get_ip_from_request(None, headers) is None # Should return None + + # Test case 6: Empty headers and remote address + headers = {} + assert get_ip_from_request(None, headers) is None # Should return None + + +# Test `is_ip` function : +def test_valid_ipv4(): + assert is_ip("192.168.1.1") # Valid IPv4 + assert is_ip("255.255.255.255") # Valid IPv4 + assert is_ip("0.0.0.0") # Valid IPv4 + + +def test_invalid_ipv4(): + assert not is_ip("256.256.256.256") # Invalid IPv4 + assert not is_ip("192.168.1") # Invalid IPv4 + assert not is_ip("abc.def.ghi.jkl") # Invalid IPv4 + + +def test_valid_ipv6(): + assert is_ip("::1") # Valid IPv6 (loopback) + assert is_ip("2001:0db8:85a3:0000:0000:8a2e:0370:7334") # Valid IPv6 + + +def test_invalid_ipv6(): + assert not is_ip("2001:db8:85a3::8a2e:370:7334:12345") # Invalid IPv6 + assert not is_ip("::g") # Invalid IPv6 + + +# Test `get_client_ip_from_x_forwarded_for` function : +def test_get_client_ip_from_x_forwarded_for(): + # Test cases with valid IPs + assert get_client_ip_from_x_forwarded_for("192.168.1.1") == "192.168.1.1" + assert get_client_ip_from_x_forwarded_for("192.168.1.1, 10.0.0.1") == "192.168.1.1" + assert get_client_ip_from_x_forwarded_for("10.0.0.1, 192.168.1.1") == "10.0.0.1" + assert ( + get_client_ip_from_x_forwarded_for("2001:0db8:85a3:0000:0000:8a2e:0370:7334") + == "2001:0db8:85a3:0000:0000:8a2e:0370:7334" + ) + assert ( + get_client_ip_from_x_forwarded_for( + "2001:0db8:85a3:0000:0000:8a2e:0370:7334, 192.168.1.1" + ) + == "2001:0db8:85a3:0000:0000:8a2e:0370:7334" + ) + + # Test cases with mixed valid and invalid IPs + assert ( + get_client_ip_from_x_forwarded_for("256.256.256.256, 192.168.1.1") + == "192.168.1.1" + ) # Invalid IPv4 ignored + assert ( + get_client_ip_from_x_forwarded_for("192.168.1.1, abc.def.ghi.jkl") + == "192.168.1.1" + ) # Invalid IPv4 ignored + assert ( + get_client_ip_from_x_forwarded_for( + "::1, 2001:0db8:85a3:0000:0000:8a2e:0370:7334" + ) + == "::1" + ) # Valid IPv6 preferred + + # Test cases with only invalid IPs + assert ( + get_client_ip_from_x_forwarded_for("abc.def.ghi.jkl, 256.256.256.256") is None + ) # All invalid + assert get_client_ip_from_x_forwarded_for("") is None # Empty string diff --git a/aikido_firewall/helpers/get_ip_from_request.py b/aikido_firewall/helpers/get_ip_from_request.py new file mode 100644 index 000000000..f6079d028 --- /dev/null +++ b/aikido_firewall/helpers/get_ip_from_request.py @@ -0,0 +1,69 @@ +""" +Mainly exports the `get_ip_from_request` function +""" + +import socket +import os + + +def get_ip_from_request(remote_address, headers): + """ + Tries and get the IP address from the request, checking for x-forwarded-for + """ + if headers: + lower_headers = {key.lower(): value for key, value in headers.items()} + if "x-forwarded-for" in lower_headers and trust_proxy(): + x_forwarded_for = get_client_ip_from_x_forwarded_for( + lower_headers["x-forwarded-for"] + ) + + if x_forwarded_for and is_ip(x_forwarded_for): + return x_forwarded_for + + if remote_address and is_ip(remote_address): + return remote_address + + return None + + +def get_client_ip_from_x_forwarded_for(value): + """ + Fetches the IP out of the X-Forwarder-For headers + """ + forwarded_ips = [ip.strip() for ip in value.split(",")] + + for ip in forwarded_ips: + if ":" in ip: + parts = ip.split(":") + if len(parts) == 2: + return parts[0] + + for ip in forwarded_ips: + if is_ip(ip): + return ip + + return None + + +def trust_proxy(): + """ + Checks the enviornment variables for `AIKIDO_TRUST_PROXY` + """ + return not "AIKIDO_TRUST_PROXY" in os.environ or os.environ[ + "AIKIDO_TRUST_PROXY" + ] in ["1", "true"] + + +def is_ip(value): + """ + Checks if `value` is a vlid IPv4 or IPv6 ip + """ + try: + socket.inet_pton(socket.AF_INET, value) # Check for IPv4 + return True + except socket.error: + try: + socket.inet_pton(socket.AF_INET6, value) # Check for IPv6 + return True + except socket.error: + return False From c90fbcf134f68921ff1637f1ce69f5ddf9b71f8e Mon Sep 17 00:00:00 2001 From: Wout Feys Date: Wed, 7 Aug 2024 09:22:43 +0200 Subject: [PATCH 2/2] Use the get_ip_from_request function in context --- aikido_firewall/context/__init__.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/aikido_firewall/context/__init__.py b/aikido_firewall/context/__init__.py index cc44696a0..7d962d3f0 100644 --- a/aikido_firewall/context/__init__.py +++ b/aikido_firewall/context/__init__.py @@ -7,6 +7,7 @@ from http.cookies import SimpleCookie from aikido_firewall.helpers.build_route_from_url import build_route_from_url from aikido_firewall.helpers.get_subdomains_from_url import get_subdomains_from_url +from aikido_firewall.helpers.get_ip_from_request import get_ip_from_request SUPPORTED_SOURCES = ["django", "flask", "django-gunicorn"] UINPUT_SOURCES = ["body", "cookies", "query", "headers"] @@ -70,10 +71,11 @@ def __init__(self, context_obj=None, req=None, source=None): self.set_django_gunicorn_attrs(req) self.route = build_route_from_url(self.url) self.subdomains = get_subdomains_from_url(self.url) + self.remote_address = get_ip_from_request(self.raw_ip, self.headers) def set_django_gunicorn_attrs(self, req): """Set properties that are specific to django-gunicorn""" - self.remote_address = req.remote_addr + self.raw_ip = req.remote_addr self.url = req.uri self.body = parse_qs(req.body_copy.decode("utf-8")) self.query = parse_qs(req.query) @@ -82,7 +84,7 @@ def set_django_gunicorn_attrs(self, req): def set_django_attrs(self, req): """set properties that are specific to django""" - self.remote_address = req.META.get("REMOTE_ADDR") + self.raw_ip = req.META.get("REMOTE_ADDR") self.url = req.build_absolute_uri() self.body = dict(req.POST) self.query = dict(req.GET) @@ -90,7 +92,7 @@ def set_django_attrs(self, req): def set_flask_attrs(self, req): """Set properties that are specific to flask""" - self.remote_address = req.remote_addr + self.raw_ip = req.remote_addr self.url = req.url if req.is_json: self.body = req.json