diff --git a/aikido_zen/background_process/packages.py b/aikido_zen/background_process/packages.py index 19425e37a..e8712e89f 100644 --- a/aikido_zen/background_process/packages.py +++ b/aikido_zen/background_process/packages.py @@ -39,7 +39,7 @@ def is_package_compatible(package=None, required_version=ANY_VERSION, packages=N return True # No match found - logger.info("Zen does not support %s", packages) + logger.info("Zen does not support current version of %s", "/".join(packages)) return False except Exception as e: logger.debug("Exception occurred in is_package_compatible: %s", e) diff --git a/aikido_zen/sinks/__init__.py b/aikido_zen/sinks/__init__.py index 3cd752ab6..a551ef8af 100644 --- a/aikido_zen/sinks/__init__.py +++ b/aikido_zen/sinks/__init__.py @@ -1,5 +1,4 @@ from wrapt import wrap_object, FunctionWrapper, when_imported - from aikido_zen.background_process.packages import ANY_VERSION, is_package_compatible from aikido_zen.errors import AikidoException from aikido_zen.helpers.logging import logger @@ -12,10 +11,17 @@ def on_import(name, package="", version_requirement=ANY_VERSION): """ def decorator(func): - if package and not is_package_compatible(package, version_requirement): - return + def check_pkg_wrapper(f): + def wrapper(*args, **kwargs): + # This code runs only on import + if package and not is_package_compatible(package, version_requirement): + return + return f(*args, **kwargs) + + return wrapper - when_imported(name)(func) # Register the function to be called on import + # Register the function to be called on import + when_imported(name)(check_pkg_wrapper(func)) return decorator diff --git a/aikido_zen/sinks/asyncpg.py b/aikido_zen/sinks/asyncpg.py index bfca59fc0..3c24de653 100644 --- a/aikido_zen/sinks/asyncpg.py +++ b/aikido_zen/sinks/asyncpg.py @@ -2,64 +2,28 @@ Sink module for `asyncpg` """ -import copy -import aikido_zen.importhook as importhook -from aikido_zen.background_process.packages import is_package_compatible import aikido_zen.vulnerabilities as vulns -from aikido_zen.helpers.logging import logger +from aikido_zen.helpers.get_argument import get_argument +from aikido_zen.sinks import patch_function, before, on_import -REQUIRED_ASYNCPG_VERSION = "0.27.0" - -@importhook.on_import("asyncpg.connection") -def on_asyncpg_import(asyncpg): +@on_import("asyncpg.connection", "asyncpg", version_requirement="0.27.0") +def patch(m): """ - Hook 'n wrap on `asyncpg.connection` - * the Cursor classes in asyncpg.cursor are only used to fetch data. (Currently not supported) - * Pool class uses Connection class (Wrapping supported for Connection class) - * _execute(...) get's called by all except execute and executemany - Our goal is to wrap the _execute(), execute(), executemany() functions in Connection class : - https://github.com/MagicStack/asyncpg/blob/85d7eed40637e7cad73a44ed2439ffeb2a8dc1c2/asyncpg/connection.py#L43 - Returns : Modified asyncpg.connection object + patching module asyncpg.connection + - patches Connection.execute, Connection.executemany, Connection._execute + - doesn't patch Cursor class -> are only used to fetch data. + - doesn't patch Pool class -> uses Connection class + src: https://github.com/MagicStack/asyncpg/blob/85d7eed40637e7cad73a44ed2439ffeb2a8dc1c2/asyncpg/connection.py#L43 """ - if not is_package_compatible("asyncpg", REQUIRED_ASYNCPG_VERSION): - return asyncpg - modified_asyncpg = importhook.copy_module(asyncpg) - - # pylint: disable=protected-access # We need to wrap this function - former__execute = copy.deepcopy(asyncpg.Connection._execute) - former_executemany = copy.deepcopy(asyncpg.Connection.executemany) - former_execute = copy.deepcopy(asyncpg.Connection.execute) - - def aikido_new__execute(_self, query, *args, **kwargs): - vulns.run_vulnerability_scan( - kind="sql_injection", - op="asyncpg.connection.Connection._execute", - args=(query, "postgres"), - ) - - return former__execute(_self, query, *args, **kwargs) - - def aikido_new_executemany(_self, query, *args, **kwargs): - # This query is just a string, not a list, see docs. - vulns.run_vulnerability_scan( - kind="sql_injection", - op="asyncpg.connection.Connection.executemany", - args=(query, "postgres"), - ) - return former_executemany(_self, query, *args, **kwargs) + patch_function(m, "Connection.execute", _execute) + patch_function(m, "Connection.executemany", _execute) + patch_function(m, "Connection._execute", _execute) - def aikido_new_execute(_self, query, *args, **kwargs): - vulns.run_vulnerability_scan( - kind="sql_injection", - op="asyncpg.connection.Connection.execute", - args=(query, "postgres"), - ) - return former_execute(_self, query, *args, **kwargs) - # pylint: disable=no-member - setattr(asyncpg.Connection, "_execute", aikido_new__execute) - setattr(asyncpg.Connection, "executemany", aikido_new_executemany) - setattr(asyncpg.Connection, "execute", aikido_new_execute) +@before +def _execute(func, instance, args, kwargs): + query = get_argument(args, kwargs, 0, "query") - return modified_asyncpg + op = f"asyncpg.connection.Connection.{func.__name__}" + vulns.run_vulnerability_scan(kind="sql_injection", op=op, args=(query, "postgres")) diff --git a/aikido_zen/sinks/builtins.py b/aikido_zen/sinks/builtins.py index 70167e45b..c1dd54da0 100644 --- a/aikido_zen/sinks/builtins.py +++ b/aikido_zen/sinks/builtins.py @@ -3,34 +3,26 @@ """ from pathlib import PurePath -import aikido_zen.importhook as importhook import aikido_zen.vulnerabilities as vulns +from aikido_zen.helpers.get_argument import get_argument +from aikido_zen.sinks import patch_function, on_import, before -def aikido_open_decorator(func): - """Decorator for open(...)""" +@before +def _open(func, instance, args, kwargs): + filename = get_argument(args, kwargs, 0, "filename") + if not isinstance(filename, (str, bytes, PurePath)): + return - def wrapper(*args, **kwargs): - # args[0] is thefunc_name filename - if len(args) > 0 and isinstance(args[0], (str, bytes, PurePath)): - vulns.run_vulnerability_scan( - kind="path_traversal", op="builtins.open", args=(args[0],) - ) - return func(*args, **kwargs) + vulns.run_vulnerability_scan( + kind="path_traversal", op="builtins.open", args=(filename,) + ) - return wrapper - -@importhook.on_import("builtins") -def on_builtins_import(builtins): +@on_import("builtins") +def patch(m): """ - Hook 'n wrap on `builtins`, python's built-in functions - Our goal is to wrap the open() function, which you use when opening files - Returns : Modified builtins object + patching module builtins + - patches builtins.open """ - modified_builtins = importhook.copy_module(builtins) - - # pylint: disable=no-member - setattr(builtins, "open", aikido_open_decorator(builtins.open)) - setattr(modified_builtins, "open", aikido_open_decorator(builtins.open)) - return modified_builtins + patch_function(m, "open", _open) diff --git a/aikido_zen/sinks/http_client.py b/aikido_zen/sinks/http_client.py index f41203966..d8e495ed2 100644 --- a/aikido_zen/sinks/http_client.py +++ b/aikido_zen/sinks/http_client.py @@ -2,49 +2,35 @@ Sink module for `http` """ -import copy -import aikido_zen.importhook as importhook -from aikido_zen.helpers.logging import logger -from aikido_zen.vulnerabilities import run_vulnerability_scan +from aikido_zen.helpers.get_argument import get_argument +from aikido_zen.sinks import before, after, patch_function, on_import from aikido_zen.vulnerabilities.ssrf.handle_http_response import ( handle_http_response, ) from aikido_zen.helpers.try_parse_url import try_parse_url -from aikido_zen.errors import AikidoException -@importhook.on_import("http.client") -def on_http_import(http): - """ - Hook 'n wrap on `http.client.HTTPConnection.putrequest` - Our goal is to wrap the putrequest() function of the HTTPConnection class : - https://github.com/python/cpython/blob/372df1950817dfcf8b9bac099448934bf8657cf5/Lib/http/client.py#L1136 - Returns : Modified http.client object - """ - modified_http = importhook.copy_module(http) - former_putrequest = copy.deepcopy(http.HTTPConnection.putrequest) - former_getresponse = copy.deepcopy(http.HTTPConnection.getresponse) +@before +def _putrequest(func, instance, args, kwargs): + # putrequest(...) is called with path argument, store this on the HTTPConnection + # instance for later use in the getresponse(...) function. + path = get_argument(args, kwargs, 1, "path") + setattr(instance, "_aikido_var_path", path) + - def aik_new_putrequest(_self, method, path, *args, **kwargs): - # Aikido putrequest, gets called before the request goes through - # Set path for aik_new_getresponse : - _self.aikido_attr_path = path - return former_putrequest(_self, method, path, *args, **kwargs) +@after +def _getresponse(func, instance, args, kwargs, return_value): + path = getattr(instance, "_aikido_var_path") + source_url = try_parse_url(f"http://{instance.host}:{instance.port}{path}") + handle_http_response(http_response=return_value, source=source_url) - def aik_new_getresponse(_self): - # Aikido getresponse, gets called after the request is complete - # And fetches the response - response = former_getresponse(_self) - try: - assembled_url = f"http://{_self.host}:{_self.port}{_self.aikido_attr_path}" - source_url = try_parse_url(assembled_url) - handle_http_response(http_response=response, source=source_url) - except Exception as e: - logger.debug("Exception occurred in custom getresponse function : %s", e) - return response - # pylint: disable=no-member - setattr(http.HTTPConnection, "putrequest", aik_new_putrequest) - # pylint: disable=no-member - setattr(http.HTTPConnection, "getresponse", aik_new_getresponse) - return modified_http +@on_import("http.client") +def patch(m): + """ + patching module http.client + - patches HTTPConnection.putrequest -> stores path + - patches HTTPConnection.getresponse -> handles response object + """ + patch_function(m, "HTTPConnection.putrequest", _putrequest) + patch_function(m, "HTTPConnection.getresponse", _getresponse) diff --git a/aikido_zen/sinks/io.py b/aikido_zen/sinks/io.py index 15966189b..48d543080 100644 --- a/aikido_zen/sinks/io.py +++ b/aikido_zen/sinks/io.py @@ -2,34 +2,35 @@ Sink module for python's `io` """ -import copy -import aikido_zen.importhook as importhook import aikido_zen.vulnerabilities as vulns +from aikido_zen.helpers.get_argument import get_argument +from aikido_zen.sinks import patch_function, before, on_import -KIND = "path_traversal" +@before +def _open(func, instance, args, kwargs): + file = get_argument(args, kwargs, 0, "file") + if not file: + return + + vulns.run_vulnerability_scan(kind="path_traversal", op="io.open", args=(file,)) -@importhook.on_import("io") -def on_io_import(io): - """ - Hook 'n wrap on `io`, wrapping io.open(...) and io.open_code(...) - Returns : Modified io object - """ - modified_io = importhook.copy_module(io) - former_open_func = copy.deepcopy(io.open) - former_open_code_func = copy.deepcopy(io.open_code) - def aikido_open_func(file, *args, **kwargs): - if file: - vulns.run_vulnerability_scan(kind=KIND, op="io.open", args=(file,)) - return former_open_func(file, *args, **kwargs) +@before +def _open_code(func, instance, args, kwargs): + path = get_argument(args, kwargs, 0, "path") + if not path: + return - def aikido_open_code_func(path): - if path: - vulns.run_vulnerability_scan(kind=KIND, op="io.open_code", args=(path,)) - return former_open_code_func(path) + vulns.run_vulnerability_scan(kind="path_traversal", op="io.open_code", args=(path,)) - setattr(modified_io, "open", aikido_open_func) - setattr(modified_io, "open_code", aikido_open_code_func) - return modified_io +@on_import("io") +def patch(m): + """ + patching module io + - patches io.open(file, ...) + - patches io.open_code(path) + """ + patch_function(m, "open", _open) + patch_function(m, "open_code", _open_code) diff --git a/aikido_zen/sinks/mysqlclient.py b/aikido_zen/sinks/mysqlclient.py index ca712aca3..5de3b91ff 100644 --- a/aikido_zen/sinks/mysqlclient.py +++ b/aikido_zen/sinks/mysqlclient.py @@ -2,43 +2,38 @@ Sink module for `mysqlclient` """ -import copy -import aikido_zen.importhook as importhook -from aikido_zen.background_process.packages import is_package_compatible -from aikido_zen.helpers.logging import logger +from aikido_zen.helpers.get_argument import get_argument import aikido_zen.vulnerabilities as vulns +from aikido_zen.sinks import patch_function, on_import, before -REQUIRED_MYSQLCLIENT_VERSION = "1.5.0" - -@importhook.on_import("MySQLdb.cursors") -def on_mysqlclient_import(mysql): +@on_import("MySQLdb.cursors", "mysqlclient", version_requirement="1.5.0") +def patch(m): """ - Hook 'n wrap on `MySQLdb.cursors` - Our goal is to wrap the query() function of the Connection class : - https://github.com/PyMySQL/mysqlclient/blob/9fd238b9e3105dcbed2b009a916828a38d1f0904/src/MySQLdb/connections.py#L257 - Returns : Modified MySQLdb.connections object + patching MySQLdb.cursors (mysqlclient) + - patches Cursor.execute(query, ...) + - patches Cursor.executemany(query, ...) """ - if not is_package_compatible("mysqlclient", REQUIRED_MYSQLCLIENT_VERSION): - return mysql - modified_mysql = importhook.copy_module(mysql) - prev_execute_func = copy.deepcopy(mysql.Cursor.execute) - prev_executemany_func = copy.deepcopy(mysql.Cursor.executemany) - - def aikido_new_execute(self, query, args=None): - if isinstance(query, bytearray): - logger.debug("Query is bytearray, normally comes from executemany.") - return prev_execute_func(self, query, args) - vulns.run_vulnerability_scan( - kind="sql_injection", op="MySQLdb.Cursor.execute", args=(query, "mysql") - ) - return prev_execute_func(self, query, args) - - def aikido_new_executemany(self, query, args): - op = "MySQLdb.Cursor.executemany" - vulns.run_vulnerability_scan(kind="sql_injection", op=op, args=(query, "mysql")) - return prev_executemany_func(self, query, args) - - setattr(mysql.Cursor, "execute", aikido_new_execute) - setattr(mysql.Cursor, "executemany", aikido_new_executemany) - return modified_mysql + patch_function(m, "Cursor.execute", _execute) + patch_function(m, "Cursor.executemany", _executemany) + + +@before +def _execute(func, instance, args, kwargs): + query = get_argument(args, kwargs, 0, "query") + if isinstance(query, bytearray): + # If query is type bytearray, it will be picked up by our wrapping of executemany + return + + vulns.run_vulnerability_scan( + kind="sql_injection", op="MySQLdb.Cursor.execute", args=(query, "mysql") + ) + + +@before +def _executemany(func, instance, args, kwargs): + query = get_argument(args, kwargs, 0, "query") + + vulns.run_vulnerability_scan( + kind="sql_injection", op="MySQLdb.Cursor.executemany", args=(query, "mysql") + ) diff --git a/aikido_zen/sinks/os.py b/aikido_zen/sinks/os.py index 9cf4f8809..8d2d96474 100644 --- a/aikido_zen/sinks/os.py +++ b/aikido_zen/sinks/os.py @@ -2,78 +2,55 @@ Sink module for python's `os` """ -import copy from pathlib import PurePath -import aikido_zen.importhook as importhook import aikido_zen.vulnerabilities as vulns +from aikido_zen.sinks import before, patch_function, on_import -# os.func(...) functions, can have a filename and destination. -OS_FILE_FUNCTIONS = [ - "access", - "chmod", - "chown", - "mkdir", - "listdir", - "readlink", - "unlink", - "rename", - "rmdir", - "remove", - "symlink", - "link", - "walk", - "open", -] -# os.path.realpath, os.path.abspath aren't wrapped, since they use os.path.join -OS_PATH_FUNCTIONS = ["getsize", "join", "expanduser", "expandvars"] +@before +def _os_patch(func, instance, args, kwargs): + possible_paths = args + tuple(kwargs.values()) + for path in possible_paths: + if not isinstance(path, (str, bytes, PurePath)): + continue + # change op if it's an os.path function + op = f"os.{func.__name__}" + if func.__name__ in ("getsize", "join", "expanduser", "expandvars", "realpath"): + op = f"os.path.{func.__name__}" -# os.makedirs() is not wrapped since it uses os.mkdir() which we wrap -# os.path.exists() and functions alike are not wrapped for performance reasons. -# We also don't wrap the stat library : https://docs.python.org/3/library/stat.html + vulns.run_vulnerability_scan(kind="path_traversal", op=op, args=(path,)) -def generate_aikido_function(op, former_func): +@on_import("os") +def patch(m): """ - Returns a generated aikido function given an operation - and the previous function + patching module os + - patches os.* functions that take in paths + - patches os.path.* functions that take in paths + - doesn't patch os.makedirs -> uses os.mkdir + - doesn't patch os.path.abspath -> uses os.path.join + - doesn't patch os.path.exists and others -> to big of a performance impact + - doesn't patch stat library https://docs.python.org/3/library/stat.html """ - - def aikido_new_func(*args, op=op, former_func=former_func, **kwargs): - for arg in args: - if isinstance(arg, (str, bytes, PurePath)): - vulns.run_vulnerability_scan( - kind="path_traversal", op=f"os.{op}", args=(arg,) - ) - return former_func(*args, **kwargs) - - return aikido_new_func - - -@importhook.on_import("os") -def on_os_import(os): - """ - Hook 'n wrap on `os` module, wrapping os.func(...) and os.path.func(...) - Returns : Modified os object - """ - modified_os = importhook.copy_module(os) - for op in OS_FILE_FUNCTIONS: - # Wrap os. functions - if not hasattr(os, op): - continue # Don't wrap methods that are specific to the OS (e.g. chown) - former_func = copy.deepcopy(getattr(os, op)) - aikido_new_func = generate_aikido_function(op, former_func) - setattr(os, op, aikido_new_func) - setattr(modified_os, op, aikido_new_func) - - for op in OS_PATH_FUNCTIONS: - # Wrap os.path functions - if not hasattr(os.path, op): - continue # Don't wrap methods that are specific to the OS - former_func = copy.deepcopy(getattr(os.path, op)) - aikido_new_func = generate_aikido_function(f"path.{op}", former_func) - setattr(os.path, op, aikido_new_func) - # pylint: disable=no-member - setattr(modified_os.path, op, aikido_new_func) - - return modified_os + # os.*(...) patches + patch_function(m, "access", _os_patch) + patch_function(m, "chmod", _os_patch) + patch_function(m, "chown", _os_patch) + patch_function(m, "mkdir", _os_patch) + patch_function(m, "listdir", _os_patch) + patch_function(m, "readlink", _os_patch) + patch_function(m, "unlink", _os_patch) + patch_function(m, "rename", _os_patch) + patch_function(m, "rmdir", _os_patch) + patch_function(m, "remove", _os_patch) + patch_function(m, "symlink", _os_patch) + patch_function(m, "link", _os_patch) + patch_function(m, "walk", _os_patch) + patch_function(m, "open", _os_patch) + + # os.path.*(...) patches + patch_function(m, "path.getsize", _os_patch) + patch_function(m, "path.join", _os_patch) + patch_function(m, "path.expanduser", _os_patch) + patch_function(m, "path.expandvars", _os_patch) + patch_function(m, "path.realpath", _os_patch) # Python 3.13 diff --git a/aikido_zen/sinks/os_system.py b/aikido_zen/sinks/os_system.py index 942973b07..52d7887bc 100644 --- a/aikido_zen/sinks/os_system.py +++ b/aikido_zen/sinks/os_system.py @@ -2,28 +2,28 @@ Sink module for `os`, wrapping os.system """ -import aikido_zen.importhook as importhook import aikido_zen.vulnerabilities as vulns +from aikido_zen.helpers.get_argument import get_argument +from aikido_zen.sinks import patch_function, before, on_import -@importhook.on_import("os") -def on_os_import(os): - """ - Hook 'n wrap on `os.system()` function - Returns : Modified os object - We don't wrap os.popen() since this command uses subprocess.Popen, which we - already wrap and protect in the subprocess.py sink. - We also don't wrap os.execl, os.execle, os.execlp, ... because these should only be vulnerable - to argument injection, which we currently don't protect against. - """ - modified_os = importhook.copy_module(os) +@before +def _system(func, instance, args, kwargs): + command = get_argument(args, kwargs, 0, "command") + if not isinstance(command, str): + return + + vulns.run_vulnerability_scan( + kind="shell_injection", op="os.system", args=(command,) + ) - def aikido_new_system(command, *args, **kwargs): - if isinstance(command, str): - vulns.run_vulnerability_scan( - kind="shell_injection", op="os.system", args=(command,) - ) - return os.system(command, *args, **kwargs) - setattr(modified_os, "system", aikido_new_system) - return modified_os +@on_import("os") +def patch(m): + """ + patching os module + - patches os.system for shell injection + - does not patch: os.popen -> uses subprocess.Popen + - does not patch: os.execl, os.execle, os.execlp, ... -> only argument injection + """ + patch_function(m, "system", _system) diff --git a/aikido_zen/sinks/psycopg.py b/aikido_zen/sinks/psycopg.py index 18672c869..2ea4dd66e 100644 --- a/aikido_zen/sinks/psycopg.py +++ b/aikido_zen/sinks/psycopg.py @@ -2,50 +2,34 @@ Sink module for `psycopg` """ -import copy -import aikido_zen.importhook as importhook -from aikido_zen.background_process.packages import is_package_compatible import aikido_zen.vulnerabilities as vulns +from aikido_zen.helpers.get_argument import get_argument +from aikido_zen.sinks import patch_function, on_import, before -REQUIRED_PSYCOPG_VERSION = "3.1.0" - -@importhook.on_import("psycopg.cursor") -def on_psycopg_import(psycopg): +@on_import("psycopg.cursor", "psycopg", version_requirement="3.1.0") +def patch(m): """ - Hook 'n wrap on `psycopg.connect` function, we modify the cursor_factory - of the result of this connect function. + patching module psycopg.cursor + - patches Cursor.copy + - patches Cursor.execute + - patches Cursor.executemany """ - if not is_package_compatible("psycopg", REQUIRED_PSYCOPG_VERSION): - return psycopg - modified_psycopg = importhook.copy_module(psycopg) - former_copy_funtcion = copy.deepcopy(psycopg.Cursor.copy) - former_execute_function = copy.deepcopy(psycopg.Cursor.execute) - former_executemany_function = copy.deepcopy(psycopg.Cursor.executemany) - - def aikido_copy(self, statement, params=None, *args, **kwargs): - sql = statement - vulns.run_vulnerability_scan( - kind="sql_injection", op="psycopg.Cursor.copy", args=(sql, "postgres") - ) - return former_copy_funtcion(self, statement, params, *args, **kwargs) + patch_function(m, "Cursor.copy", _copy) + patch_function(m, "Cursor.execute", _execute) + patch_function(m, "Cursor.executemany", _execute) - def aikido_execute(self, query, params=None, *args, **kwargs): - sql = query - vulns.run_vulnerability_scan( - kind="sql_injection", op="psycopg.Cursor.execute", args=(sql, "postgres") - ) - return former_execute_function(self, query, params, *args, **kwargs) - def aikido_executemany(self, query, params_seq): - args = (query, "postgres") - op = "psycopg.Cursor.executemany" - vulns.run_vulnerability_scan(kind="sql_injection", op=op, args=args) - return former_executemany_function(self, query, params_seq) +@before +def _copy(func, instance, args, kwargs): + statement = get_argument(args, kwargs, 0, "statement") + vulns.run_vulnerability_scan( + kind="sql_injection", op="psycopg.Cursor.copy", args=(statement, "postgres") + ) - setattr(psycopg.Cursor, "copy", aikido_copy) # pylint: disable=no-member - setattr(psycopg.Cursor, "execute", aikido_execute) # pylint: disable=no-member - # pylint: disable=no-member - setattr(psycopg.Cursor, "executemany", aikido_executemany) - return modified_psycopg +@before +def _execute(func, instance, args, kwargs): + query = get_argument(args, kwargs, 0, "query") + op = f"psycopg.Cursor.{func.__name__}" + vulns.run_vulnerability_scan(kind="sql_injection", op=op, args=(query, "postgres")) diff --git a/aikido_zen/sinks/psycopg2.py b/aikido_zen/sinks/psycopg2.py index afd5a0fe5..ee812c748 100644 --- a/aikido_zen/sinks/psycopg2.py +++ b/aikido_zen/sinks/psycopg2.py @@ -2,68 +2,57 @@ Sink module for `psycopg2` """ -import copy -import aikido_zen.importhook as importhook from aikido_zen.background_process.packages import is_package_compatible import aikido_zen.vulnerabilities as vulns +from aikido_zen.helpers.get_argument import get_argument +from aikido_zen.sinks import on_import, before, patch_function, after -PSYCOPG2_REQUIRED_VERSION = "2.9.2" +@on_import("psycopg2") +def patch(m): + """ + patching module psycopg2 + - patches psycopg2.connect + cannot set 'execute' attribute of immutable type 'psycopg2.extensions.cursor', + so we create our own cursor factory to bypass this limitation. + """ + compatible = is_package_compatible( + required_version="2.9.2", packages=["psycopg2", "psycopg2-binary"] + ) + if not compatible: + # Users can install either psycopg2 or psycopg2-binary, we need to check if at least + # one is installed and if they meet version requirements + return -def wrap_cursor_factory(cursor_factory): - former_cursor_factory = copy.deepcopy(cursor_factory) - import psycopg2.extensions as ext - - class AikidoWrappedCursor(ext.cursor): - def execute(self, *args, **kwargs): - """Aikido's wrapped execute function""" - vulns.run_vulnerability_scan( - kind="sql_injection", - op="psycopg2.Connection.Cursor.execute", - args=(args[0], "postgres"), # args[0] : sql - ) - if former_cursor_factory and hasattr(former_cursor_factory, "execute"): - return former_cursor_factory.execute(self, *args, **kwargs) - return ext.cursor.execute(self, *args, **kwargs) - - def executemany(self, *args, **kwargs): - """Aikido's wrapped executemany function""" - sql = args[0] # The data is double, but sql only once. - vulns.run_vulnerability_scan( - kind="sql_injection", - op="psycopg2.Connection.Cursor.executemany", - args=(sql, "postgres"), - ) - if former_cursor_factory and hasattr(former_cursor_factory, "executemany"): - return former_cursor_factory.executemany(self, *args, **kwargs) - return ext.cursor.executemany(self, *args, **kwargs) - - return AikidoWrappedCursor + patch_function(m, "connect", _connect) -@importhook.on_import("psycopg2") -def on_psycopg2_import(psycopg2): +@after +def _connect(func, instance, _args, _kwargs, rv): """ - Hook 'n wrap on `psycopg2.connect` function, we modify the cursor_factory - of the result of this connect function. + the return value (rv) is the new connection object, we'll change the cursor_factory attribute here. """ - # Users can install either psycopg2 or psycopg2-binary, we need to check if at least - # one is installed and if they meet version requirements : - if not is_package_compatible( - required_version=PSYCOPG2_REQUIRED_VERSION, - packages=["psycopg2", "psycopg2-binary"], - ): - # Both psycopg2 and psycopg2-binary are not supported, don't wrapping - return psycopg2 - modified_psycopg2 = importhook.copy_module(psycopg2) - former_connect_function = copy.deepcopy(psycopg2.connect) - - def aikido_connect(*args, **kwargs): - former_conn = former_connect_function(*args, **kwargs) - former_conn.cursor_factory = wrap_cursor_factory(former_conn.cursor_factory) - return former_conn - - # pylint: disable=no-member - setattr(psycopg2, "connect", aikido_connect) - setattr(modified_psycopg2, "connect", aikido_connect) - return modified_psycopg2 + if rv.cursor_factory is None: + # set default if not set + import psycopg2.extensions + + rv.cursor_factory = psycopg2.extensions.cursor + + rv.cursor_factory = type( + "AikidoPsycopg2Cursor", + (rv.cursor_factory,), + { + # Allows us to modify these otherwise immutable functions + "execute": rv.cursor_factory.execute, + "executemany": rv.cursor_factory.executemany, + }, + ) + patch_function(rv.cursor_factory, "execute", psycopg2_patch) + patch_function(rv.cursor_factory, "executemany", psycopg2_patch) + + +@before +def psycopg2_patch(func, instance, args, kwargs): + query = get_argument(args, kwargs, 0, "query") + op = f"psycopg2.Connection.Cursor.{func.__name__}" + vulns.run_vulnerability_scan(kind="sql_injection", op=op, args=(query, "postgres")) diff --git a/aikido_zen/sinks/pymongo.py b/aikido_zen/sinks/pymongo.py index 0db9bebe4..fa1065a90 100644 --- a/aikido_zen/sinks/pymongo.py +++ b/aikido_zen/sinks/pymongo.py @@ -2,92 +2,98 @@ Sink module for `pymongo` """ -from copy import deepcopy -import aikido_zen.importhook as importhook -from aikido_zen.helpers.logging import logger -import aikido_zen.background_process.packages as pkgs +from aikido_zen.helpers.get_argument import get_argument import aikido_zen.vulnerabilities as vulns +from . import patch_function, on_import, before -# find_one not present in list since find_one calls find function. - -OPERATIONS_WITH_FILTER = [ - ("replace_one", [0, "filter"]), - ("update_one", [0, "filter"]), - ("update_many", [0, "filter"]), - ("delete_one", [0, "filter"]), - ("delete_many", [0, "filter"]), - ("count_documents", [0, "filter"]), - ("find_one_and_delete", [0, "filter"]), - ("find_one_and_replace", [0, "filter"]), - ("find_one_and_update", [0, "filter"]), - ("find", [0, "filter"]), - ("find_raw_batches", [0, "filter"]), - ("distinct", [1, "filter"]), - ("watch", [0, "pipeline"]), - ("aggregate", [0, "pipeline"]), - ("aggregate_raw_batches", [0, "pipeline"]), -] - -REQUIRED_PYMONGO_VERSION = "3.10.0" - - -# Synchronous : -@importhook.on_import("pymongo.collection") -def on_pymongo_import(pymongo): + +@on_import("pymongo.collection", "pymongo", version_requirement="3.10.0") +def patch(m): """ - Hook 'n wrap on `pymongo.collection` - Our goal is to wrap the following functions in the Collection class : - https://github.com/mongodb/mongo-python-driver/blob/98658cfd1fea42680a178373333bf27f41153759/pymongo/synchronous/collection.py#L136 - Returns : Modified pymongo.collection.Collection object + patching pymongo.collection + - patches Collection.*(filter, ...) + - patches Collection.*(..., filter, ...) + - patches Collection.*(pipeline, ...) + - patches Collection.bulk_write + src: https://github.com/mongodb/mongo-python-driver/blob/98658cfd1fea42680a178373333bf27f41153759/pymongo/synchronous/collection.py#L136 """ - if not pkgs.is_package_compatible("pymongo", REQUIRED_PYMONGO_VERSION): - return pymongo - modified_pymongo = importhook.copy_module(pymongo) - for op_data in OPERATIONS_WITH_FILTER: - op = op_data[0] - if not hasattr(pymongo.Collection, op): - logger.warning("Operation `%s` not found on Collection object.", op) - - prev_func = deepcopy(getattr(pymongo.Collection, op)) - - def wrapped_op_func( - self, - *args, - prev_func=prev_func, - op_data=op_data, - **kwargs, - ): - op, spot, key = op_data[0], op_data[1][0], op_data[1][1] - data = None - if kwargs.get(key, None): - # Keyword found, setting data - data = kwargs.get(key) - elif len(args) > spot and args[spot]: - data = args[spot] - if data: - vulns.run_vulnerability_scan( - kind="nosql_injection", - op=f"pymongo.collection.Collection.{op}", - args=(data,), - ) - - return prev_func(self, *args, **kwargs) - - setattr(modified_pymongo.Collection, op, wrapped_op_func) - - # Add bulk_write support : - former_bulk_write = deepcopy(pymongo.Collection.bulk_write) - - def aikido_bulk_write(self, requests, *args, **kwargs): - for request in requests: - if hasattr(request, "_filter"): - # Requested operation has a filter - vulns.run_vulnerability_scan( - kind="nosql_injection", - op="pymongo.collection.Collection.bulk_write", - args=(request._filter,), - ) - return former_bulk_write(self, requests, *args, **kwargs) - - setattr(modified_pymongo.Collection, "bulk_write", aikido_bulk_write) - return modified_pymongo + # func(filter, ...) + patch_function(m, "Collection.replace_one", _func_filter_first) + patch_function(m, "Collection.update_one", _func_filter_first) + patch_function(m, "Collection.update_many", _func_filter_first) + patch_function(m, "Collection.delete_one", _func_filter_first) + patch_function(m, "Collection.delete_many", _func_filter_first) + patch_function(m, "Collection.count_documents", _func_filter_first) + patch_function(m, "Collection.find_one_and_delete", _func_filter_first) + patch_function(m, "Collection.find_one_and_replace", _func_filter_first) + patch_function(m, "Collection.find_one_and_update", _func_filter_first) + patch_function(m, "Collection.find", _func_filter_first) + patch_function(m, "Collection.find_raw_batches", _func_filter_first) + # find_one not present in list since find_one calls find function. + + # func(..., filter, ...) + patch_function(m, "Collection.distinct", _func_filter_second) + + # func(pipeline, ...) + patch_function(m, "Collection.watch", _func_pipeline) + patch_function(m, "Collection.aggregate", _func_pipeline) + patch_function(m, "Collection.aggregate_raw_batches", _func_pipeline) + + # bulk_write + patch_function(m, "Collection.bulk_write", _bulk_write) + + +@before +def _func_filter_first(func, instance, args, kwargs): + """Collection.func(filter, ...)""" + nosql_filter = get_argument(args, kwargs, 0, "filter") + if not nosql_filter: + return + + vulns.run_vulnerability_scan( + kind="nosql_injection", + op=f"pymongo.collection.Collection.{func.__name__}", + args=(nosql_filter,), + ) + + +@before +def _func_filter_second(func, instance, args, kwargs): + """Collection.func(..., filter, ...)""" + nosql_filter = get_argument(args, kwargs, 1, "filter") + if not nosql_filter: + return + + vulns.run_vulnerability_scan( + kind="nosql_injection", + op=f"pymongo.collection.Collection.{func.__name__}", + args=(nosql_filter,), + ) + + +@before +def _func_pipeline(func, instance, args, kwargs): + """Collection.func(pipeline, ...)""" + nosql_pipeline = get_argument(args, kwargs, 0, "pipeline") + if not nosql_pipeline: + return + + vulns.run_vulnerability_scan( + kind="nosql_injection", + op=f"pymongo.collection.Collection.{func.__name__}", + args=(nosql_pipeline,), + ) + + +@before +def _bulk_write(func, instance, args, kwargs): + requests = get_argument(args, kwargs, 0, "requests") + + # Filter requests that contain "_filter" + requests_with_filter = [req for req in requests if hasattr(req, "_filter")] + for request in requests_with_filter: + vulns.run_vulnerability_scan( + kind="nosql_injection", + op="pymongo.collection.Collection.bulk_write", + args=(request._filter,), + ) diff --git a/aikido_zen/sinks/pymysql.py b/aikido_zen/sinks/pymysql.py index ae1e5fdb1..c100039bc 100644 --- a/aikido_zen/sinks/pymysql.py +++ b/aikido_zen/sinks/pymysql.py @@ -2,47 +2,39 @@ Sink module for `pymysql` """ -import copy -import logging -import aikido_zen.importhook as importhook -from aikido_zen.background_process.packages import is_package_compatible import aikido_zen.vulnerabilities as vulns +from aikido_zen.helpers.get_argument import get_argument +from aikido_zen.sinks import patch_function, on_import, before -logger = logging.getLogger("aikido_zen") -REQUIRED_PYMYSQL_VERSION = "0.9.0" +@before +def _execute(func, instance, args, kwargs): + query = get_argument(args, kwargs, 0, "query") + if isinstance(query, bytearray): + # If query is type bytearray, it will be picked up by our wrapping of executemany + return + vulns.run_vulnerability_scan( + kind="sql_injection", op="pymysql.Cursor.execute", args=(query, "mysql") + ) -@importhook.on_import("pymysql.cursors") -def on_pymysql_import(mysql): + +@before +def _executemany(func, instance, args, kwargs): + query = get_argument(args, kwargs, 0, "query") + + vulns.run_vulnerability_scan( + kind="sql_injection", op="pymysql.Cursor.executemany", args=(query, "mysql") + ) + + +@on_import("pymysql.cursors", "pymysql", version_requirement="0.9.0") +def patch(m): """ - Hook 'n wrap on `pymysql.cursors` - Our goal is to wrap execute() and executemany() on Cursor class + patching `pymysql.cursors` + - patches Cursor.execute(query) + - patches Cursor.executemany(query) https://github.com/PyMySQL/PyMySQL/blob/95635f587ba9076e71a223b113efb08ac34a361d/pymysql/cursors.py#L133 - Returns : Modified pymysql.cursors object """ - if not is_package_compatible("pymysql", REQUIRED_PYMYSQL_VERSION): - return mysql - modified_mysql = importhook.copy_module(mysql) - - prev_execute_func = copy.deepcopy(mysql.Cursor.execute) - prev_executemany_func = copy.deepcopy(mysql.Cursor.executemany) - - def aikido_new_execute(self, query, args=None): - if isinstance(query, bytearray): - logger.debug("Query is bytearray, normally comes from executemany.") - return prev_execute_func(self, query, args) - vulns.run_vulnerability_scan( - kind="sql_injection", op="pymysql.Cursor.execute", args=(query, "mysql") - ) - return prev_execute_func(self, query, args) - - def aikido_new_executemany(self, query, args): - op = "pymysql.Cursor.executemany" - vulns.run_vulnerability_scan(kind="sql_injection", op=op, args=(query, "mysql")) - return prev_executemany_func(self, query, args) - - setattr(mysql.Cursor, "execute", aikido_new_execute) - setattr(mysql.Cursor, "executemany", aikido_new_executemany) - - return modified_mysql + patch_function(m, "Cursor.execute", _execute) + patch_function(m, "Cursor.executemany", _executemany) diff --git a/aikido_zen/sinks/shutil.py b/aikido_zen/sinks/shutil.py index 1e04e7c2a..0d8805d62 100644 --- a/aikido_zen/sinks/shutil.py +++ b/aikido_zen/sinks/shutil.py @@ -2,51 +2,32 @@ Sink module for python's `shutil` """ -import copy -import aikido_zen.importhook as importhook import aikido_zen.vulnerabilities as vulns +from aikido_zen.helpers.get_argument import get_argument +from aikido_zen.sinks import on_import, patch_function, before -# File functions func(src, dst, *, **) -SHUTIL_SRC_DST_FUNCTIONS = [ - "copymode", - "copystat", - "copytree", - "move", -] -# shutil.copyfile(src, dst, *, **) => builtins.open -# shutil.copy(src, dst, *, **) => builtins.open -# shutil.copy2(src, dst, *, **) => builtins.open +@before +def _shutil_func(func, instance, args, kwargs): + source = get_argument(args, kwargs, 0, "src") + destination = get_argument(args, kwargs, 1, "dst") -def generate_aikido_function(aik_op, func): - """ - Returns a generated aikido function given an operation - and the previous function - """ - - def wrapper(src, dst, *args, **kwargs): - kind = "path_traversal" - op = f"shutil.{aik_op}" - if src: - vulns.run_vulnerability_scan(kind, op, args=(src,)) - if dst: - vulns.run_vulnerability_scan(kind, op, args=(dst,)) - return func(src, dst, *args, **kwargs) + kind = "path_traversal" + op = f"shutil.{func.__name__}" + if isinstance(source, str): + vulns.run_vulnerability_scan(kind, op, args=(source,)) + if isinstance(destination, str): + vulns.run_vulnerability_scan(kind, op, args=(destination,)) - return wrapper - -@importhook.on_import("shutil") -def on_shutil_import(shutil): +@on_import("shutil") +def patch(m): """ - Hook 'n wrap on `shutil`, python's built-in functions - Our goal is to wrap functions found in SHUTIL_SRC_DST_FUNCTIONS - Returns : Modified shutil object + patching module shutil + - patches: copymode, copystat, copytree, move + - does not patch: copyfile, copy, copy2 -> uses builtins.open """ - modified_shutil = importhook.copy_module(shutil) - for op in SHUTIL_SRC_DST_FUNCTIONS: - # Wrap shutil. functions - aikido_new_func = generate_aikido_function(op, getattr(shutil, op)) - setattr(modified_shutil, op, aikido_new_func) - - return modified_shutil + patch_function(m, "copymode", _shutil_func) + patch_function(m, "copystat", _shutil_func) + patch_function(m, "copytree", _shutil_func) + patch_function(m, "move", _shutil_func) diff --git a/aikido_zen/sinks/socket.py b/aikido_zen/sinks/socket.py index 5dfcb748a..fc613db19 100644 --- a/aikido_zen/sinks/socket.py +++ b/aikido_zen/sinks/socket.py @@ -2,47 +2,23 @@ Sink module for `socket` """ -import copy -import aikido_zen.importhook as importhook -from aikido_zen.helpers.logging import logger +from aikido_zen.helpers.get_argument import get_argument +from aikido_zen.sinks import on_import, patch_function, after from aikido_zen.vulnerabilities import run_vulnerability_scan -SOCKET_OPERATIONS = [ - "gethostbyname", - "gethostbyaddr", - "getaddrinfo", - "create_connection", -] +@after +def _getaddrinfo(func, instance, args, kwargs, return_value): + host = get_argument(args, kwargs, 0, "host") + port = get_argument(args, kwargs, 1, "port") + arguments = (return_value, host, port) # return_value = dns response + run_vulnerability_scan(kind="ssrf", op="socket.getaddrinfo", args=arguments) -def generate_aikido_function(former_func, op): - """ - Generates a new aikido function given a former function and op - """ - - def aik_new_func(*args, **kwargs): - res = former_func(*args, **kwargs) - if op == "getaddrinfo": - run_vulnerability_scan( - kind="ssrf", op="socket.getaddrinfo", args=(res, args[0], args[1]) - ) - return res - return aik_new_func - - -@importhook.on_import("socket") -def on_socket_import(socket): +@on_import("socket") +def patch(m): """ - Hook 'n wrap on `socket` - Our goal is to wrap the getaddrinfo socket function - https://github.com/python/cpython/blob/8f19be47b6a50059924e1d7b64277ad3cef4dac7/Lib/socket.py#L10 - Returns : Modified socket object + patching module socket + - patches getaddrinfo(host, port, ...) """ - modified_socket = importhook.copy_module(socket) - for op in SOCKET_OPERATIONS: - former_func = copy.deepcopy(getattr(socket, op)) - setattr(modified_socket, op, generate_aikido_function(former_func, op)) - setattr(socket, op, generate_aikido_function(former_func, op)) - - return modified_socket + patch_function(m, "getaddrinfo", _getaddrinfo) diff --git a/aikido_zen/sinks/subprocess.py b/aikido_zen/sinks/subprocess.py index 92c8b0a6d..2e20fbfbb 100644 --- a/aikido_zen/sinks/subprocess.py +++ b/aikido_zen/sinks/subprocess.py @@ -2,71 +2,42 @@ Sink module for `subprocess` """ -import copy -import aikido_zen.importhook as importhook import aikido_zen.vulnerabilities as vulns from aikido_zen.helpers.get_argument import get_argument - -SUBPROCESS_OPERATIONS = ["check_output"] -# check_call, call, and run all call Popen class - - -def generate_aikido_function(op, former_func): - """ - Generates an aikido shell function given - an operation and a former function - """ - - def aikido_new_func(*args, op=op, former_func=former_func, **kwargs): - shell_enabled = kwargs.get("shell") - - position = ( - 1 if op == "Popen" else 0 - ) # If it's a constructor, first argument is self - shell_arguments = get_argument(args, kwargs, pos=position, name="args") - - command = None - if isinstance(shell_arguments, str): - command = shell_arguments - elif hasattr(shell_arguments, "__iter__"): - # Check if args is an iterable i.e. list, dict, tuple - # If it is we join it with spaces to run the shell_injection algorithm. - command = " ".join(shell_arguments) - - # For all operations above: call, run, check_call, Popen, check_output, default value - # of the shell property is False. - if command and shell_enabled: - vulns.run_vulnerability_scan( - kind="shell_injection", - op=f"subprocess.{op}", - args=(command,), - ) - return former_func(*args, **kwargs) - - return aikido_new_func +from aikido_zen.sinks import on_import, patch_function, before + + +def try_join_iterable(iterable): + try: + return " ".join(iterable) + except Exception: + return None + + +@before +def _subprocess_init(func, instance, args, kwargs): + shell_arguments = get_argument(args, kwargs, 0, "args") + shell_enabled = get_argument(args, kwargs, 8, "shell") + if not shell_enabled: + return # default shell property is False, we only want to scan if it's True + + command = try_join_iterable(shell_arguments) + if isinstance(shell_arguments, str): + command = shell_arguments + if not command: + return + vulns.run_vulnerability_scan( + kind="shell_injection", + op=f"subprocess.Popen", + args=(command,), + ) -@importhook.on_import("subprocess") -def on_subprocess_import(subprocess): +@on_import("subprocess") +def patch(m): """ - Hook 'n wrap on `subproccess`, wrapping multiple functions - Returns : Modified subprocess object + patching subprocess module + - patches Popen.__init__ constructor + - does not patch: check_output, check_call, call, and run (call Popen class) """ - modified_subprocess = importhook.copy_module(subprocess) - for op in SUBPROCESS_OPERATIONS: - former_func = copy.deepcopy(getattr(subprocess, op)) - setattr( - modified_subprocess, - op, - generate_aikido_function(op=op, former_func=former_func), - ) - - # Wrap Class Popen seperately: - former_popen_constructor = copy.deepcopy(subprocess.Popen.__init__) - setattr( - getattr(modified_subprocess, "Popen"), - "__init__", # Popen is a class, modify it's constructor - generate_aikido_function(op="Popen", former_func=former_popen_constructor), - ) - - return modified_subprocess + patch_function(m, "Popen.__init__", _subprocess_init) diff --git a/aikido_zen/sinks/tests/os_test.py b/aikido_zen/sinks/tests/os_test.py index c3bd96489..fc55f66fc 100644 --- a/aikido_zen/sinks/tests/os_test.py +++ b/aikido_zen/sinks/tests/os_test.py @@ -13,9 +13,9 @@ def test_ospath_commands(): import os os.path.realpath("test/test2") - op = "os.path.join" + op = "os.path.realpath" args = ("test/test2",) - mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args) + mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args) with pytest.raises(FileNotFoundError): os.path.getsize("aqkqjefbkqlleq_qkvfjksaicuaviel") @@ -25,9 +25,9 @@ def test_ospath_commands(): mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args) os.path.realpath(b"te2st/test2") - op = "os.path.join" + op = "os.path.realpath" args = (b"te2st/test2",) - mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args) + mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args) path1 = Path("./", "../", "test/../test2") with pytest.raises(FileNotFoundError): diff --git a/aikido_zen/sinks/tests/psycopg2_test.py b/aikido_zen/sinks/tests/psycopg2_test.py index 483475f74..a888a4fef 100644 --- a/aikido_zen/sinks/tests/psycopg2_test.py +++ b/aikido_zen/sinks/tests/psycopg2_test.py @@ -1,3 +1,5 @@ +import sys + import pytest from unittest.mock import patch import aikido_zen.sinks.psycopg2 @@ -6,16 +8,51 @@ kind = "sql_injection" op = "pymysql.connections.query" +# psycopg2 not working on python 3.13 for now +skip_python_3_13 = sys.version_info[:2] == (3, 13) + @pytest.fixture def database_conn(): import psycopg2 + from psycopg2.extras import DictCursor + + return psycopg2.connect( + cursor_factory=DictCursor, + host="127.0.0.1", + user="user", + password="password", + database="db", + ) + + +@pytest.fixture +def database_conn_immutable_cursor(): + import psycopg2 + from psycopg2.extensions import cursor return psycopg2.connect( - host="127.0.0.1", user="user", password="password", database="db" + cursor_factory=cursor, + host="127.0.0.1", + user="user", + password="password", + database="db", ) +@pytest.fixture +def database_conn_empty_cursor(): + import psycopg2 + + return psycopg2.connect( + host="127.0.0.1", + user="user", + password="password", + database="db", + ) + + +@pytest.mark.skipif(skip_python_3_13, reason="Skipping test on Python 3.13") def test_cursor_execute(database_conn): reset_comms() with patch( @@ -35,6 +72,47 @@ def test_cursor_execute(database_conn): mock_run_vulnerability_scan.assert_called_once() +@pytest.mark.skipif(skip_python_3_13, reason="Skipping test on Python 3.13") +def test_cursor_execute2(database_conn_empty_cursor): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + cursor = database_conn_empty_cursor.cursor() + query = "SELECT * FROM dogs" + cursor.execute(query) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert called_with_args[0] == query + assert called_with_args[1] == "postgres" + mock_run_vulnerability_scan.assert_called_once() + + cursor.fetchall() + database_conn_empty_cursor.close() + mock_run_vulnerability_scan.assert_called_once() + + +@pytest.mark.skipif(skip_python_3_13, reason="Skipping test on Python 3.13") +def test_cursor_execute3(database_conn_immutable_cursor): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + cursor = database_conn_immutable_cursor.cursor() + query = "SELECT * FROM dogs" + cursor.execute(query) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert called_with_args[0] == query + assert called_with_args[1] == "postgres" + mock_run_vulnerability_scan.assert_called_once() + + cursor.fetchall() + database_conn_immutable_cursor.close() + mock_run_vulnerability_scan.assert_called_once() + + +@pytest.mark.skipif(skip_python_3_13, reason="Skipping test on Python 3.13") def test_cursor_execute_parameterized(database_conn): reset_comms() with patch( @@ -47,13 +125,31 @@ def test_cursor_execute_parameterized(database_conn): called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] assert called_with_args[0] == query assert called_with_args[1] == "postgres" - mock_run_vulnerability_scan.assert_called_once() - database_conn.commit() database_conn.close() + + +@pytest.mark.skipif(skip_python_3_13, reason="Skipping test on Python 3.13") +def test_cursor_execute_parameterized2(database_conn_empty_cursor): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + cursor = database_conn_empty_cursor.cursor() + query = "INSERT INTO dogs (dog_name, isadmin) VALUES (%s, %s)" + cursor.execute(query, ("doggo", True)) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert called_with_args[0] == query + assert called_with_args[1] == "postgres" + mock_run_vulnerability_scan.assert_called_once() + + database_conn_empty_cursor.commit() + database_conn_empty_cursor.close() mock_run_vulnerability_scan.assert_called_once() +@pytest.mark.skipif(skip_python_3_13, reason="Skipping test on Python 3.13") def test_cursor_executemany(database_conn): reset_comms() with patch( @@ -72,9 +168,61 @@ def test_cursor_executemany(database_conn): == "INSERT INTO dogs (dog_name, isadmin) VALUES (%s, %s)" ) assert called_with_args[1] == "postgres" - mock_run_vulnerability_scan.assert_called_once() database_conn.commit() cursor.close() database_conn.close() + + +@pytest.mark.skipif(skip_python_3_13, reason="Skipping test on Python 3.13") +def test_cursor_executemany2(database_conn_empty_cursor): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + cursor = database_conn_empty_cursor.cursor() + data = [ + ("Doggy", False), + ("Doggy 2", True), + ("Dogski", True), + ] + cursor.executemany("INSERT INTO dogs (dog_name, isadmin) VALUES (%s, %s)", data) + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert ( + called_with_args[0] + == "INSERT INTO dogs (dog_name, isadmin) VALUES (%s, %s)" + ) + assert called_with_args[1] == "postgres" + mock_run_vulnerability_scan.assert_called_once() + + database_conn_empty_cursor.commit() + cursor.close() + database_conn_empty_cursor.close() + mock_run_vulnerability_scan.assert_called_once() + + +@pytest.mark.skipif(skip_python_3_13, reason="Skipping test on Python 3.13") +def test_cursor_executemany3(database_conn_immutable_cursor): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + cursor = database_conn_immutable_cursor.cursor() + data = [ + ("Doggy", False), + ("Doggy 2", True), + ("Dogski", True), + ] + cursor.executemany("INSERT INTO dogs (dog_name, isadmin) VALUES (%s, %s)", data) + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert ( + called_with_args[0] + == "INSERT INTO dogs (dog_name, isadmin) VALUES (%s, %s)" + ) + assert called_with_args[1] == "postgres" + mock_run_vulnerability_scan.assert_called_once() + + database_conn_immutable_cursor.commit() + cursor.close() + database_conn_immutable_cursor.close() mock_run_vulnerability_scan.assert_called_once() diff --git a/aikido_zen/sinks/tests/shutil_test.py b/aikido_zen/sinks/tests/shutil_test.py index adfaf1f6c..a9e3efc40 100644 --- a/aikido_zen/sinks/tests/shutil_test.py +++ b/aikido_zen/sinks/tests/shutil_test.py @@ -28,7 +28,7 @@ def test_shutil_copyfile(): assert call_2.kwargs["args"] == args2 -def test_shutil_copyfile(): +def test_shutil_copyfile_2(): with patch( "aikido_zen.vulnerabilities.run_vulnerability_scan" ) as mock_run_vulnerability_scan: @@ -135,7 +135,6 @@ def test_shutil_copy(): op = "builtins.open" args1 = ("Makefile",) args2 = ("test2",) - assert len(mock_run_vulnerability_scan.call_args_list) == 3 call_1 = mock_run_vulnerability_scan.call_args_list[0] call_2 = mock_run_vulnerability_scan.call_args_list[1] @@ -155,7 +154,6 @@ def test_shutil_copy2(): op = "builtins.open" args1 = ("Makefile",) args2 = ("test2",) - assert len(mock_run_vulnerability_scan.call_args_list) == 3 call_1 = mock_run_vulnerability_scan.call_args_list[0] call_2 = mock_run_vulnerability_scan.call_args_list[1] diff --git a/aikido_zen/sinks/tests/subprocess_test.py b/aikido_zen/sinks/tests/subprocess_test.py index a155725b1..8a8bb4735 100644 --- a/aikido_zen/sinks/tests/subprocess_test.py +++ b/aikido_zen/sinks/tests/subprocess_test.py @@ -171,34 +171,34 @@ def test_subprocess_check_output(): ) as mock_run_vulnerability_scan: import subprocess - op = "subprocess.check_output" + op = "subprocess.Popen" subprocess.check_output(["ls", "-la"], shell=True) args = ("ls -la",) - mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args) + mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args) with pytest.raises(subprocess.CalledProcessError): subprocess.check_output(["cfsknflks"], shell=True) args = ("cfsknflks",) - mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args) + mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args) with pytest.raises(subprocess.CalledProcessError): subprocess.check_output(("tuple", "command"), shell=True) args = ("tuple command",) - mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args) + mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args) with pytest.raises(subprocess.CalledProcessError): subprocess.check_output({"key": "value"}, shell=True) args = ("key",) - mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args) + mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args) with pytest.raises(subprocess.CalledProcessError): subprocess.check_output({"ke": "value", "key2": "value2"}, shell=True) args = ("ke key2",) - mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args) + mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args) -def test_subprocess_check_output(): +def test_subprocess_check_output_2(): with patch( "aikido_zen.vulnerabilities.run_vulnerability_scan" ) as mock_run_vulnerability_scan: