Skip to content

AIK-3168 : Pour sql_injection vulnerability code over to python firewall (including testing) #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 57 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
ec73f29
Copy over payloads/ folder
Jul 17, 2024
9f99b7c
Add the 3 dialects with more extensive testing
Jul 17, 2024
292bec6
Add the constants and their tests
Jul 17, 2024
88423b4
Copy over the "contains_user_input" function
Jul 17, 2024
576ce20
Add __init__.py to vulns so it becomes a module
Jul 17, 2024
3384b2c
Create escape_string_regexp helper (broken atm)
Jul 17, 2024
56212bf
Fix helper escape_string_regexp, it now works
Jul 17, 2024
d612086
Fix too long line
Jul 17, 2024
5a47368
Linting
Jul 17, 2024
39cd21e
Start of the copy of userinput_contains_sql_syntax function
Jul 17, 2024
1b96725
Add dictionary cached_regexes
Jul 17, 2024
f72d2fd
Copy over tests, still partially broken
Jul 17, 2024
cada223
Fix bugs with userinput_contains_sql_syntax and it's tests
Jul 17, 2024
0d8da2e
Add some boilerplates for the other functions that need to be copied …
Jul 17, 2024
f6788e4
Linting and moving query_contains_user_input into it's own seperate file
Jul 17, 2024
8e20005
AI Copy over/start copying over different SQL modules
Jul 17, 2024
8378756
Linting
Jul 18, 2024
baa2c3e
Add try_decode_as_jwt and it's tests
Jul 18, 2024
716c99d
Maybe shouldn't forget linting haha
Jul 18, 2024
569d657
Merge branch 'main' into AIK-3168
Jul 18, 2024
101617b
Fix broken lockfile
Jul 18, 2024
83c885c
Add new helper function get_current_and_next_segments
Jul 18, 2024
6e85c03
Integrate the new helper function with userinput_occ code
Jul 18, 2024
2854b4d
Fix some bugs with userinput_occ_safely_enc
Jul 18, 2024
a2f0903
Update userinput_occurences_safely_encapsulated
Jul 18, 2024
14405fe
Add missing comments to userinput_contains_sql_syntax
Jul 18, 2024
dccc164
Rename userinput_occurrences_safely_encapsulated -> uinput_occ_safely…
Jul 18, 2024
e7d7826
Rename userinput_occurrences_safely_encapsulated -> uinput_occ_safely…
Jul 18, 2024
73e7bd1
Linting and extra comments
Jul 18, 2024
41c2ee5
Merge branch 'AIK-3168' of github.com:AikidoSec/firewall-python into …
Jul 18, 2024
c13aa3c
Linting
Jul 18, 2024
31a2b35
this really is annoying pylint behaviour
Jul 18, 2024
fd00efb
Add is_plain_object function and tests
Jul 18, 2024
5fbdb5f
Add new function build_path_to_payload with tests
Jul 18, 2024
a1ecd73
Add extract_strings_from_user_input and it's tests
Jul 18, 2024
336ff8d
Update how the context gets parsed, for json stuff
Jul 18, 2024
ed329c1
Remove unused variable e
Jul 18, 2024
6cfd882
pymysql call check_context_for_sql_injection
Jul 18, 2024
e98713e
Add some basic logging for now to check_context_for_sql_injection and…
Jul 18, 2024
ccb80dc
Linting
Jul 18, 2024
fc137e9
More logging stuff
Jul 18, 2024
6c5de54
Add debug logs and compile all regexes on spot
Jul 18, 2024
80faba3
Fix a bug with how the testing reads lines from the file
Jul 19, 2024
ea34822
Extra debugging
Jul 19, 2024
5f071c2
Linting
Jul 19, 2024
4a2ef77
Update sql_injection algo for user input to use compiled regex with f…
Jul 19, 2024
b0d66f2
fix bug in regexes for this file
Jul 19, 2024
02ac7e6
import regex as re as in the other parts of the codebase
Jul 19, 2024
a7975be
Add some comments and follow more pylint patterns
Jul 19, 2024
26a513f
Extra pylint stuff (score should be at 9.76
Jul 19, 2024
f9db79c
Use more sources
Jul 19, 2024
efd4216
Merge remote-tracking branch 'origin/main' into AIK-3168
Jul 19, 2024
136076f
Update tests for context
Jul 19, 2024
8f97c45
Also pass along path to the payload
Jul 19, 2024
3b24bfa
Update logging in pymysql.py
Jul 19, 2024
6534f78
Add some tests for js_slice
Jul 19, 2024
85e7d13
Merge branch 'main' into AIK-3168
bitterpanda63 Jul 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@ disable = unused-argument
ignore-patterns=(.)*_test\.py,test_(.)*\.py

[MESSAGES CONTROL]
# Disable the no-member warning [Currently enabled]
#disable = no-member
disable=too-few-public-methods
8 changes: 8 additions & 0 deletions aikido_firewall/helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""
init file for the helpers/ folder, imports helper function for easier accessibility
"""

from aikido_firewall.helpers.escape_string_regexp import escape_string_regexp
from aikido_firewall.helpers.get_current_and_next_segments import (
get_current_and_next_segments,
)
23 changes: 23 additions & 0 deletions aikido_firewall/helpers/build_path_to_payload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""
Helper function file, see funtion definition
"""


def build_path_to_payload(path_to_payload):
"""
Create a string so people see the path to where
the injection took place
"""
if len(path_to_payload) == 0:
return "."

result = ""
for part in path_to_payload:
if part["type"] == "object":
result += "." + part["key"]
elif part["type"] == "array":
result += f".[{part['index']}]"
elif part["type"] == "jwt":
result += "<jwt>"

return result
41 changes: 41 additions & 0 deletions aikido_firewall/helpers/build_path_to_payload_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import pytest
from aikido_firewall.helpers.build_path_to_payload import build_path_to_payload


def test_build_path_to_payload_empty():
assert build_path_to_payload([]) == "."


def test_build_path_to_payload_single_object():
path = [{"type": "object", "key": "name"}]
assert build_path_to_payload(path) == ".name"


def test_build_path_to_payload_single_array():
path = [{"type": "array", "index": 0}]
assert build_path_to_payload(path) == ".[0]"


def test_build_path_to_payload_single_jwt():
path = [{"type": "jwt"}]
assert build_path_to_payload(path) == "<jwt>"


def test_build_path_to_payload_mixed_types():
path = [
{"type": "object", "key": "user"},
{"type": "array", "index": 2},
{"type": "jwt"},
{"type": "object", "key": "details"},
{"type": "array", "index": 1},
]
assert build_path_to_payload(path) == ".user.[2]<jwt>.details.[1]"


def test_build_path_to_payload_multiple_objects():
path = [
{"type": "object", "key": "user"},
{"type": "object", "key": "details"},
{"type": "object", "key": "address"},
]
assert build_path_to_payload(path) == ".user.details.address"
17 changes: 17 additions & 0 deletions aikido_firewall/helpers/escape_string_regexp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""
Helper function file, see funtion definition
"""

import re


def escape_string_regexp(string):
"""
Escape characters with special meaning either inside or outside character sets.
Use a simple backslash escape when it's always valid, and a '\\xnn' escape
when the simpler form would be disallowed by Unicode patterns' stricter grammar.
Taken from https://github.com/sindresorhus/escape-string-regexp/
"""
pattern = re.compile(r"[|\\{}()[\]^$+*?.]")
replace1 = re.sub(pattern, r"\\\g<0>", string)
return re.sub(r"-", r"\\x2d", replace1)
18 changes: 18 additions & 0 deletions aikido_firewall/helpers/escape_string_regexp_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import re
import pytest
from aikido_firewall.helpers import escape_string_regexp


def test_escape_string_regexp():
assert (
escape_string_regexp("\\ ^ $ * + ? . ( ) | { } [ ]")
== "\\\\ \\^ \\$ \\* \\+ \\? \\. \\( \\) \\| \\{ \\} \\[ \\]"
)


def test_escape_hyphen_pcre():
assert escape_string_regexp("foo - bar") == "foo \\x2d bar"


def test_escape_hyphen_unicode_flag():
assert re.match(escape_string_regexp("-"), "-u") is not None
42 changes: 42 additions & 0 deletions aikido_firewall/helpers/extract_strings_from_user_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""
Helper function file, see funtion definition
"""

from aikido_firewall.helpers.try_decode_as_jwt import try_decode_as_jwt
from aikido_firewall.helpers.build_path_to_payload import build_path_to_payload


def extract_strings_from_user_input(obj, path_to_payload=None):
"""
Extracts strings from an object (user input)
"""
if path_to_payload is None:
path_to_payload = []

results = {}

if isinstance(obj, dict):
for key, value in obj.items():
results[key] = build_path_to_payload(path_to_payload)
for k, v in extract_strings_from_user_input(
value, path_to_payload + [{"type": "object", "key": key}]
).items():
results[k] = v

if isinstance(obj, list):
for i, value in enumerate(obj):
for k, v in extract_strings_from_user_input(
value, path_to_payload + [{"type": "array", "index": i}]
).items():
results[k] = v

if isinstance(obj, str):
results[obj] = build_path_to_payload(path_to_payload)
jwt = try_decode_as_jwt(obj)
if jwt[0]:
for k, v in extract_strings_from_user_input(
jwt[1], path_to_payload + [{"type": "jwt"}]
).items():
results[k] = v

return results
96 changes: 96 additions & 0 deletions aikido_firewall/helpers/extract_strings_from_user_input_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import pytest
from aikido_firewall.helpers.extract_strings_from_user_input import (
extract_strings_from_user_input,
)


def from_obj(obj):
return dict(obj)


def test_empty_object_returns_empty_dict():
assert extract_strings_from_user_input({}) == from_obj({})


def test_extract_query_objects():
assert extract_strings_from_user_input({"age": {"$gt": "21"}}) == from_obj(
{"age": ".", "$gt": ".age", "21": ".age.$gt"}
)
assert extract_strings_from_user_input({"title": {"$ne": "null"}}) == from_obj(
{"title": ".", "$ne": ".title", "null": ".title.$ne"}
)
assert extract_strings_from_user_input(
{"age": "whaat", "user_input": ["whaat", "dangerous"]}
) == from_obj(
{
"user_input": ".",
"age": ".",
"whaat": ".user_input.[0]",
"dangerous": ".user_input.[1]",
}
)


def test_extract_cookie_objects():
assert extract_strings_from_user_input(
{"session": "ABC", "session2": "DEF"}
) == from_obj(
{"session2": ".", "session": ".", "ABC": ".session", "DEF": ".session2"}
)
assert extract_strings_from_user_input(
{"session": "ABC", "session2": 1234}
) == from_obj({"session2": ".", "session": ".", "ABC": ".session"})


def test_extract_header_objects():
assert extract_strings_from_user_input(
{"Content-Type": "application/json"}
) == from_obj({"Content-Type": ".", "application/json": ".Content-Type"})
assert extract_strings_from_user_input({"Content-Type": 54321}) == from_obj(
{"Content-Type": "."}
)
assert extract_strings_from_user_input(
{"Content-Type": "application/json", "ExtraHeader": "value"}
) == from_obj(
{
"Content-Type": ".",
"application/json": ".Content-Type",
"ExtraHeader": ".",
"value": ".ExtraHeader",
}
)


def test_extract_body_objects():
assert extract_strings_from_user_input(
{"nested": {"nested": {"$ne": None}}}
) == from_obj({"nested": ".nested", "$ne": ".nested.nested"})
assert extract_strings_from_user_input(
{"age": {"$gt": "21", "$lt": "100"}}
) == from_obj(
{"age": ".", "$lt": ".age", "$gt": ".age", "21": ".age.$gt", "100": ".age.$lt"}
)


def test_decodes_jwts():
assert extract_strings_from_user_input(
{
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwidXNlcm5hbWUiOnsiJG5lIjpudWxsfSwiaWF0IjoxNTE2MjM5MDIyfQ._jhGJw9WzB6gHKPSozTFHDo9NOHs3CNOlvJ8rWy6VrQ"
}
) == from_obj(
{
"token": ".",
"iat": ".token<jwt>",
"username": ".token<jwt>",
"sub": ".token<jwt>",
"1234567890": ".token<jwt>.sub",
"$ne": ".token<jwt>.username",
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwidXNlcm5hbWUiOnsiJG5lIjpudWxsfSwiaWF0IjoxNTE2MjM5MDIyfQ._jhGJw9WzB6gHKPSozTFHDo9NOHs3CNOlvJ8rWy6VrQ": ".token",
}
)


def test_jwt_as_string():
assert extract_strings_from_user_input(
{"header": "/;ping%20localhost;.e30=."}
) == from_obj({"header": ".", "/;ping%20localhost;.e30=.": ".header"})
8 changes: 8 additions & 0 deletions aikido_firewall/helpers/get_current_and_next_segments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""
Helper function file, see function definition for more info
"""


def get_current_and_next_segments(array):
"""Get the current and next segments of an array"""
return [(array[i], array[i + 1]) for i in range(len(array) - 1)]
28 changes: 28 additions & 0 deletions aikido_firewall/helpers/get_current_and_next_segments_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import pytest
from aikido_firewall.helpers.get_current_and_next_segments import (
get_current_and_next_segments,
)


def test_empty_array():
assert get_current_and_next_segments([]) == []


def test_single_item_array():
assert get_current_and_next_segments(["a"]) == []


def test_two_item_array():
assert get_current_and_next_segments(["a", "b"]) == [("a", "b")]


def test_three_item_array():
assert get_current_and_next_segments(["a", "b", "c"]) == [("a", "b"), ("b", "c")]


def test_four_item_array():
assert get_current_and_next_segments(["a", "b", "c", "d"]) == [
("a", "b"),
("b", "c"),
("c", "d"),
]
11 changes: 11 additions & 0 deletions aikido_firewall/helpers/is_plain_object.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""
Helper function file, see funtion definition
"""


def is_plain_object(o):
"""
Checks if the object is a plain object,
i.e. a dictionary in Python
"""
return str(type(o)) == "<class 'dict'>"
34 changes: 34 additions & 0 deletions aikido_firewall/helpers/is_plain_object_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import pytest
from aikido_firewall.helpers.is_plain_object import is_plain_object


def test_is_plain_object_true():
assert is_plain_object({}) == True
assert is_plain_object({"foo": "bar"}) == True


def test_is_plain_object_false():
class Foo:
def __init__(self):
self.abc = {}

foo_instance = Foo()

assert is_plain_object("/foo/") == False
assert is_plain_object(lambda: None) == False
assert is_plain_object(1) == False
assert is_plain_object(["foo", "bar"]) == False
assert is_plain_object([]) == False
assert is_plain_object(foo_instance) == False
assert is_plain_object(None) == False


def test_is_plain_object_modified_prototype():
class CustomConstructor:
pass

CustomConstructor.prototype = list

instance = CustomConstructor()

assert is_plain_object(instance) == False
25 changes: 25 additions & 0 deletions aikido_firewall/helpers/try_decode_as_jwt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""
Helper function file, see function docstrin
"""

import base64
import json


def try_decode_as_jwt(jwt):
"""
This will try to decode a string as a JWT
"""
if "." not in jwt:
return (False, None)

parts = jwt.split(".")

if len(parts) != 3:
return (False, None)

try:
jwt = json.loads(base64.b64decode(str.encode(parts[1]) + b"==").decode("utf-8"))
return (True, jwt)
except Exception:
return (False, None)
Loading
Loading