diff --git a/aikido_zen/sources/django/run_init_stage.py b/aikido_zen/sources/django/run_init_stage.py
index 4b865c1ae..6b8625b16 100644
--- a/aikido_zen/sources/django/run_init_stage.py
+++ b/aikido_zen/sources/django/run_init_stage.py
@@ -10,20 +10,20 @@ def run_init_stage(request):
"""Parse request and body, run "init" stage with request_handler"""
body = None
try:
- # try-catch loading of form parameters, this is to fix issue with DATA_UPLOAD_MAX_NUMBER_FIELDS :
try:
- body = request.POST.dict()
- if len(body) == 0:
- body = None # Reset
+ body = json.loads(request.body)
except Exception:
pass
- # Check for JSON or XML :
- if body is None and request.content_type == "application/json":
+ if body is None or len(body) == 0:
+ # try-catch loading of form parameters, this is to fix issue with DATA_UPLOAD_MAX_NUMBER_FIELDS :
try:
- body = json.loads(request.body)
+ body = request.POST.dict()
+ if len(body) == 0:
+ body = None # Reset
except Exception:
pass
+
if body is None or len(body) == 0:
# E.g. XML Data
body = request.body
diff --git a/aikido_zen/sources/django/run_init_stage_test.py b/aikido_zen/sources/django/run_init_stage_test.py
index 7afbf5275..60796336b 100644
--- a/aikido_zen/sources/django/run_init_stage_test.py
+++ b/aikido_zen/sources/django/run_init_stage_test.py
@@ -40,6 +40,18 @@ def mock_request():
return request
+@pytest.fixture
+def mock_request_form_body():
+ """Fixture to create a mock request object."""
+ request = MagicMock()
+ request.POST.dict.return_value = {"a": [1, 2], "b": [2, 3]}
+ request.content_type = "application/x-www-form-urlencoded"
+ request.body = "a[0]=1&a[1]=2&b[0]=2&b[1]=3" # Example JSON body
+ request.META = wsgi_request
+ request.scope = None
+ return request
+
+
@pytest.fixture(autouse=True)
def run_around_tests():
yield
@@ -57,10 +69,9 @@ def test_run_init_stage_with_json(mock_request):
assert {"key": "value"} == context.body
-def test_run_init_stage_with_dict(mock_request):
+def test_run_init_stage_with_dict(mock_request_form_body):
"""Test run_init_stage with a JSON request."""
- mock_request.POST.dict.return_value = {"a": [1, 2], "b": [2, 3]}
- run_init_stage(mock_request)
+ run_init_stage(mock_request_form_body)
# Assertions
context: Context = get_current_context()
@@ -120,6 +131,36 @@ def test_run_init_stage_with_empty_body_string(mock_request):
assert context.body is None
+def test_run_init_stage_with_json_wrong_content_type(mock_request):
+ """Test run_init_stage with an XML request."""
+ mock_request.content_type = "application/x-www-form-urlencoded"
+ mock_request.body = '{"key": "value"}' # Example XML body
+ run_init_stage(mock_request)
+ # Assertions
+ context: Context = get_current_context()
+ assert context.body == {"key": "value"}
+
+
+def test_run_init_stage_with_xml_wrong_content_type(mock_request):
+ """Test run_init_stage with an XML request."""
+ mock_request.content_type = "application/json"
+ mock_request.body = "value" # Example XML body
+ run_init_stage(mock_request)
+ # Assertions
+ context: Context = get_current_context()
+ assert context.body == "value"
+
+
+def test_run_init_stage_with_xml_wrong_content_type_form_urlencoded(mock_request):
+ """Test run_init_stage with an XML request."""
+ mock_request.content_type = "application/x-www-form-urlencoded"
+ mock_request.body = "value=2" # Example XML body
+ run_init_stage(mock_request)
+ # Assertions
+ context: Context = get_current_context()
+ assert context.body == "value=2"
+
+
def test_run_init_stage_with_xml(mock_request):
"""Test run_init_stage with an XML request."""
mock_request.content_type = "application/xml"
diff --git a/aikido_zen/sources/quart.py b/aikido_zen/sources/quart.py
index a2057362f..cf0636963 100644
--- a/aikido_zen/sources/quart.py
+++ b/aikido_zen/sources/quart.py
@@ -36,14 +36,20 @@ async def handle_request_wrapper(former_handle_request, quart_app, req):
try:
context = get_current_context()
if context:
+ body = None
+ try:
+ body = await req.get_json(force=True)
+ except Exception:
+ pass
+
form = await req.form
- if req.is_json:
- context.set_body(await req.get_json())
- elif form:
- context.set_body(form)
+ if form and body is None:
+ body = form
else:
data = await req.data
- context.set_body(data.decode("utf-8"))
+ body = data.decode("utf-8")
+
+ context.set_body(body)
context.cookies = req.cookies.to_dict()
context.set_as_current_context()
except Exception as e:
diff --git a/end2end/django_mysql_test.py b/end2end/django_mysql_test.py
index 7e499be4d..efe90be0b 100644
--- a/end2end/django_mysql_test.py
+++ b/end2end/django_mysql_test.py
@@ -1,6 +1,7 @@
import time
import pytest
import requests
+import json
from .server.check_events_from_mock import fetch_events_from_mock, validate_started_event, filter_on_event_type, validate_heartbeat
# e2e tests for django_mysql sample app
@@ -45,6 +46,32 @@ def test_dangerous_response_with_firewall():
'user': None
}
+def test_dangerous_response_with_form_header_but_json_body():
+ headers = {"Content-Type": "application/x-www-form-urlencoded"}
+ json_body = json.dumps({"dog_name": 'Dangerous bobby", 1); -- '})
+
+ res = requests.post(base_url_fw + "/json/create", headers=headers, data=json_body)
+ assert res.status_code == 500
+ time.sleep(5)
+
+ events = fetch_events_from_mock("http://localhost:5000")
+ attacks = filter_on_event_type(events, "detected_attack")
+
+ assert len(attacks) == 2
+ del attacks[1]["attack"]["stack"]
+ assert attacks[1]["attack"] == {
+ "blocked": True,
+ "kind": "sql_injection",
+ "metadata": {
+ "sql": 'INSERT INTO sample_app_dogs (dog_name, dog_boss) VALUES ("Dangerous bobby", 1); -- ", "N/A")'
+ },
+ "operation": "MySQLdb.Cursor.execute",
+ "pathToPayload": ".dog_name",
+ "payload": '"Dangerous bobby\\", 1); -- "',
+ "source": "body",
+ "user": None,
+ }
+
def test_dangerous_response_with_firewall_shell():
dog_name = 'Dangerous bobby", 1); -- '
res = requests.get(base_url_fw + "/shell/ls -la")
@@ -53,10 +80,9 @@ def test_dangerous_response_with_firewall_shell():
events = fetch_events_from_mock("http://localhost:5000")
attacks = filter_on_event_type(events, "detected_attack")
- assert len(attacks) == 2
- del attacks[0] # Previous attack
- del attacks[0]["attack"]["stack"]
- assert attacks[0]["attack"] == {
+ assert len(attacks) == 3
+ del attacks[2]["attack"]["stack"]
+ assert attacks[2]["attack"] == {
"blocked": True,
"kind": "shell_injection",
'metadata': {'command': 'ls -la'},
@@ -85,5 +111,5 @@ def test_initial_heartbeat():
"method": "POST",
"path": "/app/create"
}],
- {"aborted":0,"attacksDetected":{"blocked":2,"total":2},"total":0}
+ {"aborted":0,"attacksDetected":{"blocked":3,"total":3},"total":0}
)
diff --git a/end2end/quart_postgres_uvicorn_test.py b/end2end/quart_postgres_uvicorn_test.py
index ebea4a9ab..75d13ea17 100644
--- a/end2end/quart_postgres_uvicorn_test.py
+++ b/end2end/quart_postgres_uvicorn_test.py
@@ -1,5 +1,6 @@
import time
import pytest
+import json
import requests
from .server.check_events_from_mock import fetch_events_from_mock, validate_started_event, filter_on_event_type
@@ -45,6 +46,31 @@ def test_dangerous_response_with_firewall():
assert attacks[0]["attack"]["user"]["id"] == "user123"
assert attacks[0]["attack"]["user"]["name"] == "John Doe"
+def test_dangerous_response_with_form_header_but_json_body():
+ headers = {"Content-Type": "application/x-www-form-urlencoded"}
+ json_body = json.dumps({"dog_name": "Dangerous bobby', 1); -- "})
+
+ res = requests.post(post_url_fw + "/json", headers=headers, data=json_body)
+ assert res.status_code == 500
+ time.sleep(5)
+
+ events = fetch_events_from_mock("http://localhost:5000")
+ attacks = filter_on_event_type(events, "detected_attack")
+
+ assert len(attacks) == 2
+ del attacks[1]["attack"]["stack"]
+ assert attacks[1]["attack"] == {
+ "blocked": True,
+ "kind": "sql_injection",
+ "metadata": {
+ "sql": "INSERT INTO dogs (dog_name, isAdmin) VALUES ('Dangerous bobby', 1); -- ', FALSE)"
+ },
+ "operation": "asyncpg.connection.Connection.execute",
+ "pathToPayload": ".dog_name",
+ "payload": '"Dangerous bobby\', 1); -- "',
+ "source": "body",
+ "user": {'id': 'user123', 'lastIpAddress': '127.0.0.1', 'name': 'John Doe'},
+ }
def test_dangerous_response_without_firewall():
dog_name = "Dangerous Bobby', TRUE); -- "
diff --git a/sample-apps/django-mysql/sample_app/urls.py b/sample-apps/django-mysql/sample_app/urls.py
index 296442ee1..c7be2b8e8 100644
--- a/sample-apps/django-mysql/sample_app/urls.py
+++ b/sample-apps/django-mysql/sample_app/urls.py
@@ -5,6 +5,7 @@
urlpatterns = [
path("", views.index, name="index"),
path("dogpage/", views.dog_page, name="dog_page"),
+ path("json/create", views.json_create_dog, name="json_create"),
path("shell/", views.shell_url, name="shell"),
path("create", views.create_dogpage, name="create")
]
diff --git a/sample-apps/django-mysql/sample_app/views.py b/sample-apps/django-mysql/sample_app/views.py
index 37e45702d..1eb7721ea 100644
--- a/sample-apps/django-mysql/sample_app/views.py
+++ b/sample-apps/django-mysql/sample_app/views.py
@@ -1,11 +1,12 @@
from django.shortcuts import render, get_object_or_404
-from django.http import HttpResponse
+from django.http import HttpResponse, JsonResponse
from django.template import loader
from .models import Dogs
from django.db import connection
from django.views.decorators.csrf import csrf_exempt
# Create your views here.
import subprocess
+import json
def index(request):
dogs = Dogs.objects.all()
@@ -37,3 +38,15 @@ def create_dogpage(request):
print("QUERY : ", query)
cursor.execute(query)
return HttpResponse("Dog page created")
+
+@csrf_exempt
+def json_create_dog(request):
+ if request.method == 'POST':
+ body = request.body.decode('utf-8')
+ body_json = json.loads(body)
+ dog_name = body_json.get('dog_name')
+
+ with connection.cursor() as cursor:
+ query = 'INSERT INTO sample_app_dogs (dog_name, dog_boss) VALUES ("%s", "N/A")' % dog_name
+ cursor.execute(query)
+ return JsonResponse({"status": "Dog page created"})
\ No newline at end of file
diff --git a/sample-apps/quart-postgres-uvicorn/app.py b/sample-apps/quart-postgres-uvicorn/app.py
index 65b59725b..e02d48b1e 100644
--- a/sample-apps/quart-postgres-uvicorn/app.py
+++ b/sample-apps/quart-postgres-uvicorn/app.py
@@ -64,6 +64,24 @@ async def create_dog():
return jsonify({"message": f'Dog {dog_name} created successfully'}), 201
+@app.route("/create/json", methods=['POST'])
+async def create_dog_json():
+ data = await request.get_json(force=True)
+ dog_name = data.get('dog_name')
+
+ if not dog_name:
+ return jsonify({"error": "dog_name is required"}), 400
+
+ conn = await get_db_connection()
+ try:
+ await conn.execute(
+ f"INSERT INTO dogs (dog_name, isAdmin) VALUES ('%s', FALSE)" % dog_name
+ )
+ finally:
+ await conn.close()
+
+ return jsonify({"message": f'Dog {dog_name} created successfully'}), 201
+
@app.route("/create_many", methods=['POST'])
async def create_dog_many():
data = await request.form