From 80151a7526951559f281547e6b585a58d669eed6 Mon Sep 17 00:00:00 2001 From: Wout Feys Date: Tue, 30 Jul 2024 10:19:48 +0200 Subject: [PATCH 1/4] Renaming and splitting up files --- .../background_process/__init__.py | 131 +----------------- .../aikido_background_process.py | 72 ++++++++++ aikido_firewall/background_process/comms.py | 75 ++++++++++ .../background_process/init_test.py | 10 +- aikido_firewall/sinks/pymysql.py | 2 +- 5 files changed, 158 insertions(+), 132 deletions(-) create mode 100644 aikido_firewall/background_process/aikido_background_process.py create mode 100644 aikido_firewall/background_process/comms.py diff --git a/aikido_firewall/background_process/__init__.py b/aikido_firewall/background_process/__init__.py index af66b9785..a9230ddf5 100644 --- a/aikido_firewall/background_process/__init__.py +++ b/aikido_firewall/background_process/__init__.py @@ -13,140 +13,19 @@ from threading import Thread from queue import Queue from aikido_firewall.helpers.logging import logger +from aikido_firewall.background_process.comms import AikidoIPCCommunications -REPORT_SEC_INTERVAL = 600 # 10 minutes IPC_ADDRESS = ("localhost", 9898) # Specify the IP address and port -class AikidoBackgroundProcess: - """ - Aikido's background process consists of 2 threads : - - (main) Listening thread which listens on an IPC socket for incoming data - - (spawned) reporting thread which will collect the IPC data and send it to a Reporter - """ - - def __init__(self, address, key): - logger.debug("Background process started") - try: - listener = con.Listener(address, authkey=key) - except OSError: - logger.warning( - "Aikido listener may already be running on port %s", address[1] - ) - pid = os.getpid() - os.kill(pid, signal.SIGTERM) # Kill this subprocess - self.queue = Queue() - # Start reporting thread : - Thread(target=self.reporting_thread).start() - - while True: - conn = listener.accept() - logger.debug("connection accepted from %s", listener.last_accepted) - while True: - data = conn.recv() - logger.debug("Incoming data : %s", data) - if data[0] == "ATTACK": - self.queue.put(data[1]) - elif data[0] == "CLOSE": # this is a kind of EOL for python IPC - conn.close() - break - elif data[0] == "KILL": # when main process quits , or during testing etc - logger.debug("Killing subprocess") - conn.close() - pid = os.getpid() - os.kill(pid, signal.SIGTERM) # Kill this subprocess - - def reporting_thread(self): - """Reporting thread""" - logger.debug("Started reporting thread") - while True: - self.send_to_reporter() - time.sleep(REPORT_SEC_INTERVAL) - - def send_to_reporter(self): - """ - Reports the found data to an Aikido server - """ - items_to_report = [] - while not self.queue.empty(): - items_to_report.append(self.queue.get()) - logger.debug("Reporting to aikido server") - logger.critical("Items to report : %s", items_to_report) - # Currently not making API calls - - -# pylint: disable=invalid-name # This variable does change -ipc = None - - -def get_comms(): - """ - Returns the globally stored IPC object, which you need - to communicate to our background process. - """ - return ipc - - -def reset_comms(): - """This will reset communications""" - global ipc - if ipc: - ipc.send_data("KILL", {}) - ipc = None - - def start_background_process(): """ Starts a process to handle incoming/outgoing data """ - # pylint: disable=global-statement # We need this to be global - global ipc + # Generate a secret key : generated_key_bytes = secrets.token_bytes(32) - ipc = IPC(IPC_ADDRESS, generated_key_bytes) - ipc.start_aikido_listener() - - -class IPC: - """ - Facilitates Inter-Process communication - """ - - def __init__(self, address, key): - # The key needs to be in byte form - self.address = address - self.key = key - - def start_aikido_listener(self): - """This will start the aikido process which listens""" - pid = os.fork() - if pid == 0: # Child process - AikidoBackgroundProcess(self.address, self.key) - else: # Parent process - logger.debug("Started background process, PID: %d", pid) - - def send_data(self, action, obj): - """ - This creates a new client for comms to the background process - """ - - # We want to make sure that sending out this data affects the process as little as possible - # So we run it inside a seperate thread with a timeout of 3 seconds - def target(address, key, data_array): - try: - conn = con.Client(address, authkey=key) - logger.debug("Created connection %s", conn) - for data in data_array: - conn.send(data) - conn.send(("CLOSE", {})) - conn.close() - logger.debug("Connection closed") - except Exception as e: - logger.info("Failed to send data to bg process : %s", e) - - t = Thread( - target=target, args=(self.address, self.key, [(action, obj)]), daemon=True - ) - t.start() - t.join(timeout=3) + comms = AikidoIPCCommunications(IPC_ADDRESS, generated_key_bytes) + comms.set_global() + comms.start_aikido_listener() diff --git a/aikido_firewall/background_process/aikido_background_process.py b/aikido_firewall/background_process/aikido_background_process.py new file mode 100644 index 000000000..6b2023a8b --- /dev/null +++ b/aikido_firewall/background_process/aikido_background_process.py @@ -0,0 +1,72 @@ +""" +Simply exports the aikido background process +""" + +import multiprocessing.connection as con +import os +import time +import signal +from threading import Thread +from queue import Queue +from aikido_firewall.helpers.logging import logger + +REPORT_SEC_INTERVAL = 600 # 10 minutes + + +class AikidoBackgroundProcess: + """ + Aikido's background process consists of 2 threads : + - (main) Listening thread which listens on an IPC socket for incoming data + - (spawned) reporting thread which will collect the IPC data and send it to a Reporter + """ + + def __init__(self, address, key): + logger.debug("Background process started") + try: + listener = con.Listener(address, authkey=key) + except OSError: + logger.warning( + "Aikido listener may already be running on port %s", address[1] + ) + pid = os.getpid() + os.kill(pid, signal.SIGTERM) # Kill this subprocess + self.queue = Queue() + # Start reporting thread : + Thread(target=self.reporting_thread).start() + + while True: + conn = listener.accept() + logger.debug("connection accepted from %s", listener.last_accepted) + while True: + data = conn.recv() + logger.debug("Incoming data : %s", data) + if data[0] == "ATTACK": + self.queue.put(data[1]) + elif data[0] == "CLOSE": # this is a kind of EOL for python IPC + conn.close() + break + elif ( + data[0] == "KILL" + ): # when main process quits , or during testing etc + logger.debug("Killing subprocess") + conn.close() + pid = os.getpid() + os.kill(pid, signal.SIGTERM) # Kill this subprocess + + def reporting_thread(self): + """Reporting thread""" + logger.debug("Started reporting thread") + while True: + self.send_to_reporter() + time.sleep(REPORT_SEC_INTERVAL) + + def send_to_reporter(self): + """ + Reports the found data to an Aikido server + """ + items_to_report = [] + while not self.queue.empty(): + items_to_report.append(self.queue.get()) + logger.debug("Reporting to aikido server") + logger.critical("Items to report : %s", items_to_report) + # Currently not making API calls diff --git a/aikido_firewall/background_process/comms.py b/aikido_firewall/background_process/comms.py new file mode 100644 index 000000000..5b91ccc1a --- /dev/null +++ b/aikido_firewall/background_process/comms.py @@ -0,0 +1,75 @@ +import os +import multiprocessing.connection as con +from threading import Thread +from aikido_firewall.helpers.logging import logger +from aikido_firewall.background_process.aikido_background_process import ( + AikidoBackgroundProcess, +) + +# pylint: disable=invalid-name # This variable does change +comms = None + + +def get_comms(): + """ + Returns the globally stored IPC object, which you need + to communicate to our background process. + """ + return comms + + +def reset_comms(): + """This will reset communications""" + global comms + if comms: + comms.send_data_to_bg_process("KILL", {}) + comms = None + + +class AikidoIPCCommunications: + """ + Facilitates Inter-Process communication + """ + + def __init__(self, address, key): + # The key needs to be in byte form + self.address = address + self.key = key + + # Set as global ipc object : + reset_comms() + global comms + comms = self + + def start_aikido_listener(self): + """This will start the aikido process which listens""" + pid = os.fork() + if pid == 0: # Child process + AikidoBackgroundProcess(self.address, self.key) + else: # Parent process + logger.debug("Started background process, PID: %d", pid) + + def send_data_to_bg_process(self, action, obj): + """ + This creates a new client for comms to the background process + """ + + # We want to make sure that sending out this data affects the process as little as possible + # So we run it inside a seperate thread with a timeout of 3 seconds + def target(address, key, data_array): + try: + conn = con.Client(address, authkey=key) + logger.debug("Created connection %s", conn) + for data in data_array: + conn.send(data) + conn.send(("CLOSE", {})) + conn.close() + logger.debug("Connection closed") + except Exception as e: + logger.info("Failed to send data to bg process : %s", e) + + t = Thread( + target=target, args=(self.address, self.key, [(action, obj)]), daemon=True + ) + t.start() + t.join(timeout=3) diff --git a/aikido_firewall/background_process/init_test.py b/aikido_firewall/background_process/init_test.py index 772623a45..f96653633 100644 --- a/aikido_firewall/background_process/init_test.py +++ b/aikido_firewall/background_process/init_test.py @@ -29,7 +29,7 @@ def test_start_background_process(monkeypatch): assert get_comms() == None -def test_send_data_exception(monkeypatch, caplog): +def test_send_data_to_bg_process_exception(monkeypatch, caplog): def mock_client(address, authkey): raise Exception("Connection Error") @@ -37,13 +37,13 @@ def mock_client(address, authkey): monkeypatch.setitem(globals(), "logger", caplog) ipc = IPC(("localhost", 9898), "mock_key") - ipc.send_data("ACTION", "Test Object") + ipc.send_data_to_bg_process("ACTION", "Test Object") -def test_send_data_successful(monkeypatch, caplog, mocker): +def test_send_data_to_bg_process_successful(monkeypatch, caplog, mocker): ipc = IPC(("localhost"), "mock_key") mock_client = mocker.MagicMock() monkeypatch.setattr("multiprocessing.connection.Client", mock_client) - # Call the send_data function - ipc.send_data("ACTION", {"key": "value"}) + # Call the send_data_to_bg_process function + ipc.send_data_to_bg_process("ACTION", {"key": "value"}) diff --git a/aikido_firewall/sinks/pymysql.py b/aikido_firewall/sinks/pymysql.py index a48e86420..eb07463b6 100644 --- a/aikido_firewall/sinks/pymysql.py +++ b/aikido_firewall/sinks/pymysql.py @@ -37,7 +37,7 @@ def aikido_new_query(_self, sql, unbuffered=False): logger.info("sql_injection results : %s", json.dumps(result)) if result: - get_comms().send_data("ATTACK", result) + get_comms().send_data_to_bg_process("ATTACK", result) raise Exception("SQL Injection [aikido_firewall]") return prev_query_function(_self, sql, unbuffered=False) From 4480e2bec5ccd5e8b75e71455e50457f3afa0397 Mon Sep 17 00:00:00 2001 From: Wout Feys Date: Tue, 30 Jul 2024 10:21:58 +0200 Subject: [PATCH 2/4] Ignore global pylint error and explain threading (coomentz) --- aikido_firewall/background_process/comms.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/aikido_firewall/background_process/comms.py b/aikido_firewall/background_process/comms.py index 5b91ccc1a..ec951c6df 100644 --- a/aikido_firewall/background_process/comms.py +++ b/aikido_firewall/background_process/comms.py @@ -1,3 +1,8 @@ +""" +Holds the globally stored comms object +Exports the AikidoIPCCommunications class +""" + import os import multiprocessing.connection as con from threading import Thread @@ -20,6 +25,7 @@ def get_comms(): def reset_comms(): """This will reset communications""" + # pylint: disable=global-statement # This needs to be global global comms if comms: comms.send_data_to_bg_process("KILL", {}) @@ -38,6 +44,7 @@ def __init__(self, address, key): # Set as global ipc object : reset_comms() + # pylint: disable=global-statement # This needs to be global global comms comms = self @@ -72,4 +79,5 @@ def target(address, key, data_array): target=target, args=(self.address, self.key, [(action, obj)]), daemon=True ) t.start() + # This joins the thread for 3 seconds, afterwards the thread is forced to close (daemon=True) t.join(timeout=3) From 3dbf878b04b93c1b43a676a14f8420c690567c1b Mon Sep 17 00:00:00 2001 From: Wout Feys Date: Tue, 30 Jul 2024 10:25:34 +0200 Subject: [PATCH 3/4] Split up testing, remove unused imports --- .../background_process/__init__.py | 9 ---- .../background_process/comms_test.py | 29 +++++++++++ .../background_process/init_test.py | 49 ------------------- 3 files changed, 29 insertions(+), 58 deletions(-) create mode 100644 aikido_firewall/background_process/comms_test.py delete mode 100644 aikido_firewall/background_process/init_test.py diff --git a/aikido_firewall/background_process/__init__.py b/aikido_firewall/background_process/__init__.py index a9230ddf5..c32cbe60d 100644 --- a/aikido_firewall/background_process/__init__.py +++ b/aikido_firewall/background_process/__init__.py @@ -3,15 +3,7 @@ and listen for data sent by our sources and sinks """ -import time -import os import secrets -import signal -import socket -import multiprocessing.connection as con -from multiprocessing import Process -from threading import Thread -from queue import Queue from aikido_firewall.helpers.logging import logger from aikido_firewall.background_process.comms import AikidoIPCCommunications @@ -27,5 +19,4 @@ def start_background_process(): generated_key_bytes = secrets.token_bytes(32) comms = AikidoIPCCommunications(IPC_ADDRESS, generated_key_bytes) - comms.set_global() comms.start_aikido_listener() diff --git a/aikido_firewall/background_process/comms_test.py b/aikido_firewall/background_process/comms_test.py new file mode 100644 index 000000000..f0898925d --- /dev/null +++ b/aikido_firewall/background_process/comms_test.py @@ -0,0 +1,29 @@ +import pytest +from aikido_firewall.background_process.comms import AikidoIPCCommunications + +def test_comms_init(): + address = ("localhost", 9898) + key = "secret_key" + comms = AikidoIPCCommunications(address, key) + + assert comms.address == address + assert comms.key == key + +def test_send_data_to_bg_process_exception(monkeypatch, caplog): + def mock_client(address, authkey): + raise Exception("Connection Error") + + monkeypatch.setitem(globals(), "Client", mock_client) + monkeypatch.setitem(globals(), "logger", caplog) + + comms = AikidoIPCCommunications(("localhost", 9898), "mock_key") + comms.send_data_to_bg_process("ACTION", "Test Object") + + +def test_send_data_to_bg_process_successful(monkeypatch, caplog, mocker): + comms = AikidoIPCCommunications(("localhost"), "mock_key") + mock_client = mocker.MagicMock() + monkeypatch.setattr("multiprocessing.connection.Client", mock_client) + + # Call the send_data_to_bg_process function + comms.send_data_to_bg_process("ACTION", {"key": "value"}) diff --git a/aikido_firewall/background_process/init_test.py b/aikido_firewall/background_process/init_test.py deleted file mode 100644 index f96653633..000000000 --- a/aikido_firewall/background_process/init_test.py +++ /dev/null @@ -1,49 +0,0 @@ -import pytest -from aikido_firewall.background_process import ( - IPC, - start_background_process, - get_comms, - IPC_ADDRESS, - reset_comms, -) - - -def test_ipc_init(): - address = ("localhost", 9898) - key = "secret_key" - ipc = IPC(address, key) - - assert ipc.address == address - assert ipc.key == key - - -def test_start_background_process(monkeypatch): - reset_comms() - assert get_comms() == None - start_background_process() - - assert get_comms() != None - assert get_comms().address == IPC_ADDRESS - - reset_comms() - assert get_comms() == None - - -def test_send_data_to_bg_process_exception(monkeypatch, caplog): - def mock_client(address, authkey): - raise Exception("Connection Error") - - monkeypatch.setitem(globals(), "Client", mock_client) - monkeypatch.setitem(globals(), "logger", caplog) - - ipc = IPC(("localhost", 9898), "mock_key") - ipc.send_data_to_bg_process("ACTION", "Test Object") - - -def test_send_data_to_bg_process_successful(monkeypatch, caplog, mocker): - ipc = IPC(("localhost"), "mock_key") - mock_client = mocker.MagicMock() - monkeypatch.setattr("multiprocessing.connection.Client", mock_client) - - # Call the send_data_to_bg_process function - ipc.send_data_to_bg_process("ACTION", {"key": "value"}) From 9968b2ed8f5bf18a14cb1fab4a507dd56189148f Mon Sep 17 00:00:00 2001 From: Wout Feys Date: Tue, 30 Jul 2024 10:27:03 +0200 Subject: [PATCH 4/4] Linting and import comms --- aikido_firewall/background_process/__init__.py | 6 +++++- aikido_firewall/background_process/comms_test.py | 2 ++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/aikido_firewall/background_process/__init__.py b/aikido_firewall/background_process/__init__.py index c32cbe60d..b2bd76c46 100644 --- a/aikido_firewall/background_process/__init__.py +++ b/aikido_firewall/background_process/__init__.py @@ -5,7 +5,11 @@ import secrets from aikido_firewall.helpers.logging import logger -from aikido_firewall.background_process.comms import AikidoIPCCommunications +from aikido_firewall.background_process.comms import ( + AikidoIPCCommunications, + get_comms, + reset_comms, +) IPC_ADDRESS = ("localhost", 9898) # Specify the IP address and port diff --git a/aikido_firewall/background_process/comms_test.py b/aikido_firewall/background_process/comms_test.py index f0898925d..c0232db5f 100644 --- a/aikido_firewall/background_process/comms_test.py +++ b/aikido_firewall/background_process/comms_test.py @@ -1,6 +1,7 @@ import pytest from aikido_firewall.background_process.comms import AikidoIPCCommunications + def test_comms_init(): address = ("localhost", 9898) key = "secret_key" @@ -9,6 +10,7 @@ def test_comms_init(): assert comms.address == address assert comms.key == key + def test_send_data_to_bg_process_exception(monkeypatch, caplog): def mock_client(address, authkey): raise Exception("Connection Error")