Skip to content

Commit ffffe7d

Browse files
add management command to import all unit test sample scans (#12700)
* initial command * working version
1 parent 269a75f commit ffffe7d

File tree

1 file changed

+206
-0
lines changed

1 file changed

+206
-0
lines changed
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
import json
2+
import logging
3+
import os
4+
from datetime import datetime
5+
from importlib import import_module
6+
from importlib.util import find_spec
7+
from inspect import isclass
8+
from pathlib import Path
9+
10+
from django.core.management.base import BaseCommand
11+
from django.urls import reverse
12+
from django.utils import timezone
13+
from rest_framework.authtoken.models import Token
14+
from rest_framework.test import APIClient
15+
16+
import dojo.tools.factory
17+
from dojo.models import Engagement, Product, Product_Type
18+
from unittests.test_dashboard import User
19+
20+
logger = logging.getLogger(__name__)
21+
22+
23+
class Command(BaseCommand):
24+
25+
help = (
26+
"EXPERIMENTAL: May be changed/deprecated/removed without prior notice. "
27+
"Command to import all scans available in unittests folder"
28+
)
29+
30+
def add_arguments(self, parser):
31+
parser.add_argument(
32+
"--product-name-prefix",
33+
type=lambda s: s if len(s) <= 250 else parser.error("product-name-prefix must be at most 250 characters"),
34+
help="Prefix to use for product names, defaults to 'All scans <today>'. Max length 250 characters.",
35+
)
36+
parser.add_argument(
37+
"--include-very-big-scans",
38+
action="store_true",
39+
default=False,
40+
help="Include very big scans like jfrog_xray very_many_vulns.json (default: False)",
41+
)
42+
parser.add_argument("--tests-per-engagement", type=int, default=10, help="Number of tests per engagement before a new engagement is created, defaults to 10")
43+
parser.add_argument("--engagements-per-product", type=int, default=50, help="Number of engagements per product before a new product is created, defaults to 50")
44+
parser.add_argument("--products-per-product-type", type=int, default=15, help="Number of products per product type before a new product type is created, defaults to 15")
45+
parser.add_argument("--number-of-runs", type=int, default=1, help="Number of times to run the import of all sample scans, defaults to 1")
46+
47+
def get_test_admin(self, *args, **kwargs):
48+
return User.objects.get(username="admin")
49+
50+
def import_scan(self, payload, expected_http_status_code):
51+
testuser = self.get_test_admin()
52+
token = Token.objects.get(user=testuser)
53+
self.client = APIClient()
54+
self.client.credentials(HTTP_AUTHORIZATION="Token " + token.key)
55+
56+
response = self.client.post(reverse("importscan-list"), payload)
57+
if expected_http_status_code != response.status_code:
58+
msg = f"Expected HTTP status code {expected_http_status_code}, got {response.status_code}: {response.content[:1000]}"
59+
raise AssertionError(
60+
msg,
61+
)
62+
return json.loads(response.content)
63+
64+
def import_scan_with_params(self, filename, scan_type="ZAP Scan", engagement=1, minimum_severity="Low", *, active=True, verified=False,
65+
push_to_jira=None, endpoint_to_add=None, tags=None, close_old_findings=False, group_by=None, engagement_name=None,
66+
product_name=None, product_type_name=None, auto_create_context=None, expected_http_status_code=201, test_title=None,
67+
scan_date=None, service=None, force_active=True, force_verified=True):
68+
69+
with (Path("unittests/scans") / filename).open(encoding="utf-8") as testfile:
70+
payload = {
71+
"minimum_severity": minimum_severity,
72+
"scan_type": scan_type,
73+
"file": testfile,
74+
"version": "1.0.1",
75+
"close_old_findings": close_old_findings,
76+
}
77+
78+
if active is not None:
79+
payload["active"] = active
80+
81+
if verified is not None:
82+
payload["verified"] = verified
83+
84+
if engagement:
85+
payload["engagement"] = engagement
86+
87+
if engagement_name:
88+
payload["engagement_name"] = engagement_name
89+
90+
if product_name:
91+
payload["product_name"] = product_name
92+
93+
if product_type_name:
94+
payload["product_type_name"] = product_type_name
95+
96+
if auto_create_context:
97+
payload["auto_create_context"] = auto_create_context
98+
99+
if push_to_jira is not None:
100+
payload["push_to_jira"] = push_to_jira
101+
102+
if endpoint_to_add is not None:
103+
payload["endpoint_to_add"] = endpoint_to_add
104+
105+
if tags is not None:
106+
payload["tags"] = tags
107+
108+
if group_by is not None:
109+
payload["group_by"] = group_by
110+
111+
if test_title is not None:
112+
payload["test_title"] = test_title
113+
114+
if scan_date is not None:
115+
payload["scan_date"] = scan_date
116+
117+
if service is not None:
118+
payload["service"] = service
119+
120+
return self.import_scan(payload, expected_http_status_code)
121+
122+
def import_all_unittest_scans(self, product_name_prefix=None, tests_per_engagement=10, engagements_per_product=50, products_per_product_type=15, *, include_very_big_scans=False, **kwargs):
123+
logger.info(f"product_name_prefix: {product_name_prefix}, tests_per_engagement: {tests_per_engagement}, engagements_per_product: {engagements_per_product}, products_per_product_type: {products_per_product_type}")
124+
product_type_prefix = "Sample scans " + datetime.now().strftime("%Y-%m-%d %H:%M:%S")
125+
product_type_index = 1
126+
127+
product_index = 1
128+
engagement_index = 1
129+
tests_index = 1
130+
131+
error_count = 0
132+
error_messages = {}
133+
# iterate through the modules in the current package
134+
package_dir = str(Path(dojo.tools.factory.__file__).resolve().parent)
135+
for module_name in os.listdir(package_dir): # noqa: PTH208
136+
if tests_index > tests_per_engagement:
137+
tests_index = 1
138+
engagement_index += 1
139+
140+
if engagement_index > engagements_per_product:
141+
engagement_index = 1
142+
product_index += 1
143+
144+
if product_index > products_per_product_type:
145+
product_index = 1
146+
product_type_index += 1
147+
148+
prod_type, _ = Product_Type.objects.get_or_create(name=product_type_prefix + f" {product_type_index}")
149+
prod, _ = Product.objects.get_or_create(prod_type=prod_type, name=product_name_prefix + f" {product_type_index}:{product_index}", description="Sample scans for unittesting")
150+
eng, _ = Engagement.objects.get_or_create(product=prod, name="Sample scan engagement" + f" {engagement_index}", target_start=timezone.now(), target_end=timezone.now())
151+
152+
# check if it's dir
153+
if (Path(package_dir) / module_name).is_dir():
154+
try:
155+
# check if it's a Python module
156+
if find_spec(f"dojo.tools.{module_name}.parser"):
157+
# import the module and iterate through its attributes
158+
module = import_module(f"dojo.tools.{module_name}.parser")
159+
for attribute_name in dir(module):
160+
attribute = getattr(module, attribute_name)
161+
if isclass(attribute) and attribute_name.lower() == module_name.replace("_", "") + "parser":
162+
logger.debug(f"Loading {module_name} parser")
163+
scan_dir = Path("unittests") / "scans" / module_name
164+
for scan_file in scan_dir.glob("*.json"):
165+
if include_very_big_scans or scan_file.name != "very_many_vulns.json": # jfrog_xray file is huge and takes too long to import
166+
try:
167+
logger.info(f"Importing scan {scan_file.name} using {module_name} parser into {prod.name}:{eng.name}")
168+
parser = attribute()
169+
# with scan_file.open(encoding="utf-8") as f:
170+
# findings = parser.get_findings(f, Test())
171+
result = self.import_scan_with_params(
172+
filename=module_name + "/" + scan_file.name,
173+
scan_type=parser.get_scan_types()[0],
174+
engagement=eng.id,
175+
)
176+
# logger.debug(f"Result of import: {result}")
177+
# raise Exception(f"Scan {scan_file.name} is not expected to be imported, but it was.")
178+
logger.debug(f"Imported findings from {module_name + scan_file.name}")
179+
tests_index += 1
180+
except Exception as e:
181+
logger.error(f"Error importing scan {module_name + scan_file.name}: {e}")
182+
error_count += 1
183+
error_messages[module_name + "/" + scan_file.name] = result.get("message", str(e))
184+
185+
except:
186+
logger.exception(f"failed to load {module_name}")
187+
raise
188+
189+
logger.error(f"Error count: {error_count}")
190+
for scan, message in error_messages.items():
191+
logger.error(f"Error importing scan {scan}: {message}")
192+
193+
def handle(self, *args, **options):
194+
logger.info("EXPERIMENTAL: This command may be changed/deprecated/removed without prior notice.")
195+
for i in range(options.get("number_of_runs", 1)):
196+
product_name_prefix = options.get("product_name_prefix")
197+
if not product_name_prefix:
198+
today = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
199+
product_name_prefix = f"Sample scan product {i + 1} {today}"
200+
self.import_all_unittest_scans(
201+
product_name_prefix=product_name_prefix,
202+
tests_per_engagement=options.get("tests_per_engagement"),
203+
engagements_per_product=options.get("engagements_per_product"),
204+
products_per_product_type=options.get("products_per_product_type"),
205+
include_very_big_scans=options.get("include_very_big_scans"),
206+
)

0 commit comments

Comments
 (0)