diff --git a/dojo/metrics/views.py b/dojo/metrics/views.py
index 3efdb82a969..24c68381806 100644
--- a/dojo/metrics/views.py
+++ b/dojo/metrics/views.py
@@ -3,15 +3,12 @@
import logging
import operator
from calendar import monthrange
-from collections import OrderedDict
from datetime import date, datetime, timedelta
-from functools import reduce
-from operator import itemgetter
from dateutil.relativedelta import relativedelta
from django.contrib import messages
from django.core.exceptions import PermissionDenied
-from django.db.models import Case, Count, IntegerField, Q, Sum, Value, When
+from django.db.models import Case, Count, F, IntegerField, Q, Sum, Value, When
from django.http import HttpResponseRedirect
from django.shortcuts import get_object_or_404, render
from django.urls import reverse
@@ -35,7 +32,7 @@
identify_view,
severity_count,
)
-from dojo.models import Dojo_User, Engagement, Finding, Product, Product_Type, Risk_Acceptance, Test
+from dojo.models import Dojo_User, Finding, Product, Product_Type, Risk_Acceptance
from dojo.product.queries import get_authorized_products
from dojo.product_type.queries import get_authorized_product_types
from dojo.utils import (
@@ -680,307 +677,255 @@ def engineer_metrics(request):
"""
-# noinspection DjangoOrm
-@cache_page(60 * 5) # cache for 5 minutes
@vary_on_cookie
def view_engineer(request, eid):
user = get_object_or_404(Dojo_User, pk=eid)
- if not (request.user.is_superuser
- or request.user.username == user.username):
+ if not (request.user.is_superuser or request.user.username == user.username):
raise PermissionDenied
+
now = timezone.now()
+ tz = now.tzinfo
- if get_system_setting("enforce_verified_status", True) or get_system_setting("enforce_verified_status_metrics", True):
- findings = Finding.objects.filter(reporter=user, verified=True)
- else:
- findings = Finding.objects.filter(reporter=user)
+ # ---------------
+ # Base query-sets
+ reporter_findings = Finding.objects.filter(reporter=user)
+ if get_system_setting("enforce_verified_status", True) or get_system_setting(
+ "enforce_verified_status_metrics", True,
+ ):
+ reporter_findings = reporter_findings.filter(verified=True)
closed_findings = Finding.objects.filter(mitigated_by=user)
- open_findings = findings.exclude(mitigated__isnull=False)
- open_month = findings.filter(date__year=now.year, date__month=now.month)
- accepted_month = [finding for ra in Risk_Acceptance.objects.filter(
- created__range=[datetime(now.year,
- now.month, 1,
- tzinfo=timezone.get_current_timezone()),
- datetime(now.year,
- now.month,
- monthrange(now.year,
- now.month)[1],
- tzinfo=timezone.get_current_timezone())],
- owner=user)
- for finding in ra.accepted_findings.all()]
- closed_month = [f for f in closed_findings
- if f.mitigated and f.mitigated.year == now.year and f.mitigated.month == now.month]
+ open_findings = (
+ reporter_findings.filter(mitigated__isnull=True)
+ .select_related("test__engagement__product__prod_type", "reporter")
+ .prefetch_related("risk_acceptance_set")
+ )
+
+ # --------------------
+ # Month & week buckets
+ month_start = datetime(now.year, now.month, 1, tzinfo=tz)
+ month_end = month_start + relativedelta(months=1) # first day of next month (exclusive)
+
+ open_month = reporter_findings.filter(date__gte=month_start, date__lt=month_end)
+ closed_month = closed_findings.filter(mitigated__gte=month_start, mitigated__lt=month_end)
+ accepted_month = (
+ Finding.objects.filter(
+ risk_acceptance__owner=user,
+ risk_acceptance__created__gte=month_start,
+ risk_acceptance__created__lt=month_end,
+ ).distinct()
+ )
+
+ week_start = (now - timedelta(days=now.weekday())).replace(hour=0, minute=0, second=0, microsecond=0)
+ week_end = week_start + timedelta(days=7) # next Monday 00:00 (exclusive)
+ open_week = reporter_findings.filter(date__gte=week_start, date__lt=week_end)
+ closed_week = closed_findings.filter(mitigated__gte=week_start, mitigated__lt=week_end)
+ accepted_week = (
+ Finding.objects.filter(
+ risk_acceptance__owner=user,
+ risk_acceptance__created__gte=week_start,
+ risk_acceptance__created__lt=week_end,
+ ).distinct()
+ )
o_dict, open_count = count_findings(open_month)
c_dict, closed_count = count_findings(closed_month)
a_dict, accepted_count = count_findings(accepted_month)
- day_list = [now - relativedelta(weeks=1,
- weekday=x,
- hour=0,
- minute=0,
- second=0)
- for x in range(now.weekday())]
- day_list.append(now)
-
- q_objects = (Q(date=d) for d in day_list)
- open_week = findings.filter(reduce(operator.or_, q_objects))
-
- accepted_week = [finding for ra in Risk_Acceptance.objects.filter(
- owner=user, created__range=[day_list[0], day_list[-1]])
- for finding in ra.accepted_findings.all()]
-
- q_objects = (Q(mitigated=d) for d in day_list)
- # closed_week= findings.filter(reduce(operator.or_, q_objects))
- closed_week = [f for f in closed_findings if f.mitigated and f.mitigated >= day_list[0]]
-
o_week_dict, open_week_count = count_findings(open_week)
c_week_dict, closed_week_count = count_findings(closed_week)
a_week_dict, accepted_week_count = count_findings(accepted_week)
- stuff = []
- o_stuff = []
- a_stuff = []
- findings_this_period(findings, 1, stuff, o_stuff, a_stuff)
- # findings_this_period no longer fits the need for accepted findings
- # however will use its week finding output to use here
- for month in a_stuff:
- month_start = datetime.strptime(
- month[0].strip(), "%b %Y")
- month_end = datetime(month_start.year,
- month_start.month,
- monthrange(
- month_start.year,
- month_start.month)[1],
- tzinfo=timezone.get_current_timezone())
- for finding in [finding for ra in Risk_Acceptance.objects.filter(
- created__range=[month_start, month_end], owner=user)
- for finding in ra.accepted_findings.all()]:
- if finding.severity == "Critical":
- month[1] += 1
- if finding.severity == "High":
- month[2] += 1
- if finding.severity == "Medium":
- month[3] += 1
- if finding.severity == "Low":
- month[4] += 1
-
- month[5] = sum(month[1:])
- week_stuff = []
- week_o_stuff = []
- week_a_stuff = []
- findings_this_period(findings, 0, week_stuff, week_o_stuff, week_a_stuff)
-
- # findings_this_period no longer fits the need for accepted findings
- # however will use its week finding output to use here
- for week in week_a_stuff:
- wk_range = week[0].split("-")
- week_start = datetime.strptime(
- wk_range[0].strip() + " " + str(now.year), "%b %d %Y")
- week_end = datetime.strptime(
- wk_range[1].strip() + " " + str(now.year), "%b %d %Y")
-
- for finding in [finding for ra in Risk_Acceptance.objects.filter(
- created__range=[week_start, week_end], owner=user)
- for finding in ra.accepted_findings.all()]:
- if finding.severity == "Critical":
- week[1] += 1
- if finding.severity == "High":
- week[2] += 1
- if finding.severity == "Medium":
- week[3] += 1
- if finding.severity == "Low":
- week[4] += 1
-
- week[5] = sum(week[1:])
-
- products = get_authorized_products(Permissions.Product_Type_View)
- vulns = {}
- for product in products:
- f_count = 0
- engs = Engagement.objects.filter(product=product)
- for eng in engs:
- tests = Test.objects.filter(engagement=eng)
- for test in tests:
- f_count += findings.filter(test=test,
- mitigated__isnull=True,
- active=True).count()
- vulns[product.id] = f_count
- od = OrderedDict(sorted(vulns.items(), key=itemgetter(1)))
- items = list(od.items())
- items.reverse()
- top = items[: 10]
- update = []
- for t in top:
- product = t[0]
- z_count = 0
- o_count = 0
- t_count = 0
- h_count = 0
- engs = Engagement.objects.filter(
- product=Product.objects.get(id=product))
- for eng in engs:
- tests = Test.objects.filter(engagement=eng)
- for test in tests:
- z_count += findings.filter(
- test=test,
- mitigated__isnull=True,
- severity="Critical",
- ).count()
- o_count += findings.filter(
- test=test,
- mitigated__isnull=True,
- severity="High",
- ).count()
- t_count += findings.filter(
- test=test,
- mitigated__isnull=True,
- severity="Medium",
- ).count()
- h_count += findings.filter(
- test=test,
- mitigated__isnull=True,
- severity="Low",
- ).count()
- prod = Product.objects.get(id=product)
- all_findings_link = "{}".format(
- reverse("product_open_findings", args=(prod.id,)), escape(prod.name))
- update.append([all_findings_link, z_count, o_count, t_count, h_count,
- z_count + o_count + t_count + h_count])
- total_update = []
- for i in items:
- product = i[0]
- z_count = 0
- o_count = 0
- t_count = 0
- h_count = 0
- engs = Engagement.objects.filter(
- product=Product.objects.get(id=product))
- for eng in engs:
- tests = Test.objects.filter(engagement=eng)
- for test in tests:
- z_count += findings.filter(
- test=test,
- mitigated__isnull=True,
- severity="Critical").count()
- o_count += findings.filter(
- test=test,
- mitigated__isnull=True,
- severity="High").count()
- t_count += findings.filter(
- test=test,
- mitigated__isnull=True,
- severity="Medium").count()
- h_count += findings.filter(
- test=test,
- mitigated__isnull=True,
- severity="Low").count()
- prod = Product.objects.get(id=product)
- all_findings_link = "{}".format(
- reverse("product_open_findings", args=(prod.id,)), escape(prod.name))
- total_update.append([all_findings_link, z_count, o_count, t_count,
- h_count, z_count + o_count + t_count + h_count])
-
- neg_length = len(stuff)
- findz = findings.filter(mitigated__isnull=True, active=True,
- risk_acceptance=None)
- findz = findz.filter(Q(severity="Critical") | Q(severity="High"))
- less_thirty = 0
- less_sixty = 0
- less_nine = 0
- more_nine = 0
- for finding in findz:
- elapsed = date.today() - finding.date
- if elapsed <= timedelta(days=30):
- less_thirty += 1
- elif elapsed <= timedelta(days=60):
- less_sixty += 1
- elif elapsed <= timedelta(days=90):
- less_nine += 1
- else:
- more_nine += 1
-
- # Data for the monthly charts
- chart_data = [["Date", "S0", "S1", "S2", "S3", "Total"]]
- for thing in o_stuff:
- chart_data.insert(1, thing)
-
- a_chart_data = [["Date", "S0", "S1", "S2", "S3", "Total"]]
- for thing in a_stuff:
- a_chart_data.insert(1, thing)
-
- # Data for the weekly charts
- week_chart_data = [["Date", "S0", "S1", "S2", "S3", "Total"]]
- for thing in week_o_stuff:
- week_chart_data.insert(1, thing)
-
- week_a_chart_data = [["Date", "S0", "S1", "S2", "S3", "Total"]]
- for thing in week_a_stuff:
- week_a_chart_data.insert(1, thing)
-
- details = []
- for find in open_findings:
- team = find.test.engagement.product.prod_type.name
- name = find.test.engagement.product.name
- severity = find.severity
- description = find.title
- life = date.today() - find.date
- life = life.days
- status = "Active"
- if find.risk_accepted:
- status = "Accepted"
- detail = [team, name, severity, description, life, status, find.reporter]
- details.append(detail)
-
- details = sorted(details, key=itemgetter(2))
+ # --------------------------
+ # Historic series for charts
+ monthly_total_series, monthly_open_series, monthly_accepted_series = [], [], []
+ findings_this_period(reporter_findings, 1, monthly_total_series, monthly_open_series, monthly_accepted_series)
+
+ weekly_total_series, weekly_open_series, weekly_accepted_series = [], [], []
+ findings_this_period(reporter_findings, 0, weekly_total_series, weekly_open_series, weekly_accepted_series)
+
+ ras_owner_qs = Risk_Acceptance.objects.filter(owner=user)
+ _augment_series_with_accepted(monthly_accepted_series, ras_owner_qs, period="month", tz=tz)
+ _augment_series_with_accepted(weekly_accepted_series, ras_owner_qs, period="week", tz=tz)
+
+ chart_data = [["Date", "S0", "S1", "S2", "S3", "Total"], *monthly_open_series]
+ a_chart_data = [["Date", "S0", "S1", "S2", "S3", "Total"], *monthly_accepted_series]
+ week_chart_data = [["Date", "S0", "S1", "S2", "S3", "Total"], *weekly_open_series]
+ week_a_chart_data = [["Date", "S0", "S1", "S2", "S3", "Total"], *weekly_accepted_series]
+
+ # --------------
+ # Product tables
+ products = list(get_authorized_products(Permissions.Product_Type_View).only("id", "name"))
+ update, total_update = _product_stats(products)
+
+ # ----------------------------------
+ # Age buckets for open critical/high
+ high_crit_open = reporter_findings.filter(
+ mitigated__isnull=True,
+ active=True,
+ risk_acceptance=None,
+ severity__in=["Critical", "High"],
+ )
+ age_buckets = _age_buckets(high_crit_open)
+
+ # -------------
+ # Details table
+ details = sorted(
+ (
+ [
+ f.test.engagement.product.prod_type.name,
+ f.test.engagement.product.name,
+ f.severity,
+ f.title,
+ (date.today() - f.date).days,
+ "Accepted" if f.risk_accepted else "Active",
+ f.reporter,
+ ]
+ for f in open_findings
+ ),
+ key=operator.itemgetter(2),
+ )
add_breadcrumb(title=f"{user.get_full_name()} Metrics", top_level=False, request=request)
- return render(request, "dojo/view_engineer.html", {
- "open_month": open_month,
- "a_month": accepted_month,
- "low_a_month": accepted_count["low"],
- "medium_a_month": accepted_count["med"],
- "high_a_month": accepted_count["high"],
- "critical_a_month": accepted_count["crit"],
- "closed_month": closed_month,
- "low_open_month": open_count["low"],
- "medium_open_month": open_count["med"],
- "high_open_month": open_count["high"],
- "critical_open_month": open_count["crit"],
- "low_c_month": closed_count["low"],
- "medium_c_month": closed_count["med"],
- "high_c_month": closed_count["high"],
- "critical_c_month": closed_count["crit"],
- "week_stuff": week_stuff,
- "week_a_stuff": week_a_stuff,
- "a_total": a_stuff,
- "total": stuff,
- "sub": neg_length,
- "update": update,
- "lt": less_thirty,
- "ls": less_sixty,
- "ln": less_nine,
- "mn": more_nine,
- "chart_data": chart_data,
- "a_chart_data": a_chart_data,
- "week_chart_data": week_chart_data,
- "week_a_chart_data": week_a_chart_data,
- "name": f"{user.get_full_name()} Metrics",
- "metric": True,
- "total_update": total_update,
- "details": details,
- "open_week": open_week,
- "closed_week": closed_week,
- "accepted_week": accepted_week,
- "a_dict": a_dict,
- "o_dict": o_dict,
- "c_dict": c_dict,
- "o_week_dict": o_week_dict,
- "a_week_dict": a_week_dict,
- "c_week_dict": c_week_dict,
- "open_week_count": open_week_count,
- "accepted_week_count": accepted_week_count,
- "closed_week_count": closed_week_count,
- "user": request.user,
- })
+ return render(
+ request,
+ "dojo/view_engineer.html",
+ {
+ # month
+ "open_month": open_month,
+ "a_month": accepted_month,
+ "low_a_month": accepted_count["low"],
+ "medium_a_month": accepted_count["med"],
+ "high_a_month": accepted_count["high"],
+ "critical_a_month": accepted_count["crit"],
+ "closed_month": closed_month,
+ "low_open_month": open_count["low"],
+ "medium_open_month": open_count["med"],
+ "high_open_month": open_count["high"],
+ "critical_open_month": open_count["crit"],
+ "low_c_month": closed_count["low"],
+ "medium_c_month": closed_count["med"],
+ "high_c_month": closed_count["high"],
+ "critical_c_month": closed_count["crit"],
+ # week
+ "week_stuff": weekly_total_series,
+ "week_a_stuff": weekly_accepted_series,
+ # series
+ "a_total": monthly_accepted_series,
+ "total": monthly_total_series,
+ "sub": len(monthly_total_series),
+ # product tables
+ "update": update,
+ "total_update": total_update,
+ # aged buckets
+ "lt": age_buckets["lt"],
+ "ls": age_buckets["ls"],
+ "ln": age_buckets["ln"],
+ "mn": age_buckets["mn"],
+ # charts
+ "chart_data": chart_data,
+ "a_chart_data": a_chart_data,
+ "week_chart_data": week_chart_data,
+ "week_a_chart_data": week_a_chart_data,
+ # misc
+ "name": f"{user.get_full_name()} Metrics",
+ "metric": True,
+ "details": details,
+ "open_week": open_week,
+ "closed_week": closed_week,
+ "accepted_week": accepted_week,
+ "a_dict": a_dict,
+ "o_dict": o_dict,
+ "c_dict": c_dict,
+ "o_week_dict": o_week_dict,
+ "a_week_dict": a_week_dict,
+ "c_week_dict": c_week_dict,
+ "open_week_count": open_week_count,
+ "accepted_week_count": accepted_week_count,
+ "closed_week_count": closed_week_count,
+ "user": request.user,
+ },
+ )
+
+
+def _age_buckets(qs):
+ """Return aged high/critical finding counts in one SQL round-trip."""
+ today = date.today()
+ return qs.aggregate(
+ lt=Count("id", filter=Q(date__gte=today - timedelta(days=30))),
+ ls=Count("id", filter=Q(date__lte=today - timedelta(days=30), date__gt=today - timedelta(days=60))),
+ ln=Count("id", filter=Q(date__lte=today - timedelta(days=60), date__gt=today - timedelta(days=90))),
+ mn=Count("id", filter=Q(date__lte=today - timedelta(days=90))),
+ )
+
+
+def _augment_series_with_accepted(series: list[list], ras_qs, *, period: str, tz):
+ """Mutate `series` in-place, adding per-severity counts for accepted findings."""
+ if not series: # no buckets to augment
+ return
+
+ first_ra = ras_qs.first()
+ if first_ra is None: # engineer has no risk acceptances at all
+ return
+
+ owner = first_ra.owner
+ sev_idx = {"Critical": 1, "High": 2, "Medium": 3, "Low": 4}
+
+ for bucket in series:
+ if period == "month":
+ start = datetime.strptime(bucket[0].strip(), "%b %Y").replace(tzinfo=tz)
+ end = start + relativedelta(months=1) # first day of next month (exclusive)
+ else: # "week"
+ wk_a, _ = (d.strip() for d in bucket[0].split("-"))
+ year = timezone.now().year
+ start = datetime.strptime(f"{wk_a} {year}", "%b %d %Y").replace(tzinfo=tz)
+ end = start + timedelta(days=7) # next Monday 00:00 (exclusive)
+
+ accepted = (
+ Finding.objects.filter(
+ risk_acceptance__owner=owner,
+ risk_acceptance__created__gte=start,
+ risk_acceptance__created__lt=end,
+ )
+ .values("severity")
+ .annotate(cnt=Count("id"))
+ )
+
+ for row in accepted:
+ bucket[sev_idx[row["severity"]]] += row["cnt"]
+
+ bucket[5] = sum(bucket[1:])
+
+
+def _product_stats(products) -> tuple[list, list]:
+ """
+ Return two tables:
+ * `update` - top-10 products by open findings
+ * `total_update` - all authorized products
+ """
+ counts = (
+ Finding.objects.filter(test__engagement__product__in=products, mitigated__isnull=True, active=True)
+ .values(pid=F("test__engagement__product"))
+ .annotate(
+ critical=Count("id", filter=Q(severity="Critical")),
+ high=Count("id", filter=Q(severity="High")),
+ medium=Count("id", filter=Q(severity="Medium")),
+ low=Count("id", filter=Q(severity="Low")),
+ total=Count("id"),
+ )
+ )
+ by_id = {c["pid"]: c for c in counts}
+ top10 = sorted(by_id.items(), key=lambda kv: kv[1]["total"], reverse=True)[:10]
+
+ product_lookup = {p.id: p for p in products}
+
+ def row(prod_id):
+ prod = product_lookup[prod_id]
+ link = f"{escape(prod.name)}"
+ data = by_id[prod_id]
+ return [link, data["critical"], data["high"], data["medium"], data["low"], data["total"]]
+
+ update = [row(pid) for pid, _ in top10]
+ total_update = [row(p.id) for p in products if p.id in by_id]
+
+ return update, total_update
diff --git a/dojo/utils.py b/dojo/utils.py
index b6fba9a5eb9..e3b89954cf7 100644
--- a/dojo/utils.py
+++ b/dojo/utils.py
@@ -30,7 +30,7 @@
from django.contrib.auth.signals import user_logged_in, user_logged_out, user_login_failed
from django.contrib.contenttypes.models import ContentType
from django.core.paginator import Paginator
-from django.db.models import Case, Count, IntegerField, Q, Subquery, Sum, Value, When
+from django.db.models import Case, Count, F, IntegerField, Q, Subquery, Sum, Value, When
from django.db.models.query import QuerySet
from django.db.models.signals import post_save
from django.dispatch import receiver
@@ -578,40 +578,31 @@ def set_duplicate_reopen(new_finding, existing_finding):
existing_finding.save()
-def count_findings(findings):
- product_count = {}
- finding_count = {"low": 0, "med": 0, "high": 0, "crit": 0}
- for f in findings:
- product = f.test.engagement.product
- if product in product_count:
- product_count[product][4] += 1
- if f.severity == "Low":
- product_count[product][3] += 1
- finding_count["low"] += 1
- if f.severity == "Medium":
- product_count[product][2] += 1
- finding_count["med"] += 1
- if f.severity == "High":
- product_count[product][1] += 1
- finding_count["high"] += 1
- if f.severity == "Critical":
- product_count[product][0] += 1
- finding_count["crit"] += 1
- else:
- product_count[product] = [0, 0, 0, 0, 0]
- product_count[product][4] += 1
- if f.severity == "Low":
- product_count[product][3] += 1
- finding_count["low"] += 1
- if f.severity == "Medium":
- product_count[product][2] += 1
- finding_count["med"] += 1
- if f.severity == "High":
- product_count[product][1] += 1
- finding_count["high"] += 1
- if f.severity == "Critical":
- product_count[product][0] += 1
- finding_count["crit"] += 1
+def count_findings(findings: QuerySet) -> tuple[dict["Product", list[int]], dict[str, int]]:
+ agg = (
+ findings.values(prod_id=F("test__engagement__product_id"))
+ .annotate(
+ crit=Count("id", filter=Q(severity="Critical")),
+ high=Count("id", filter=Q(severity="High")),
+ med=Count("id", filter=Q(severity="Medium")),
+ low=Count("id", filter=Q(severity="Low")),
+ total=Count("id"),
+ )
+ )
+ rows = list(agg)
+
+ from dojo.models import Product # imported lazily to avoid circulars
+
+ products = Product.objects.in_bulk([r["prod_id"] for r in rows])
+ product_count = {
+ products[r["prod_id"]]: [r["crit"], r["high"], r["med"], r["low"], r["total"]] for r in rows
+ }
+ finding_count = {
+ "low": sum(r["low"] for r in rows),
+ "med": sum(r["med"] for r in rows),
+ "high": sum(r["high"] for r in rows),
+ "crit": sum(r["crit"] for r in rows),
+ }
return product_count, finding_count
diff --git a/unittests/test_view_engineer_metrics.py b/unittests/test_view_engineer_metrics.py
new file mode 100644
index 00000000000..fb0d9c3a3f5
--- /dev/null
+++ b/unittests/test_view_engineer_metrics.py
@@ -0,0 +1,308 @@
+"""Tests for the optimized view_engineer metrics endpoint"""
+
+from datetime import datetime, timedelta
+from unittest.mock import patch
+
+from django.test import RequestFactory, override_settings
+from django.urls import reverse
+from django.utils import timezone
+
+from dojo.models import Finding, Risk_Acceptance, User
+
+from .dojo_test_case import DojoTestCase
+
+
+@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
+class ViewEngineerMetricsTest(DojoTestCase):
+
+ """Test suite for the optimized view_engineer endpoint"""
+
+ fixtures = ["dojo_testdata.json"]
+
+ def setUp(self):
+ """Set up test data and common objects"""
+ self.factory = RequestFactory()
+ self.user1 = User.objects.get(username="user1")
+ self.user2 = User.objects.get(username="user2")
+ self.superuser = User.objects.get(username="admin")
+
+ self.test_findings = []
+ self.create_test_findings()
+
+ def create_test_findings(self):
+ """Create test findings with different severities and dates"""
+ now = timezone.now()
+
+ # Current month findings
+ for severity in ["Critical", "High", "Medium", "Low"]:
+ finding = Finding.objects.create(
+ title=f"Test Finding {severity}",
+ description=f"Test finding with {severity} severity",
+ severity=severity,
+ date=now.replace(day=15),
+ reporter=self.user1,
+ test_id=3,
+ verified=True,
+ active=True,
+ mitigated=None,
+ )
+ self.test_findings.append(finding)
+
+ # Previous month findings
+ prev_month = now - timedelta(days=30)
+ for severity in ["Critical", "High"]:
+ finding = Finding.objects.create(
+ title=f"Old Test Finding {severity}",
+ description=f"Old test finding with {severity} severity",
+ severity=severity,
+ date=prev_month,
+ reporter=self.user1,
+ test_id=3,
+ verified=True,
+ active=True,
+ mitigated=None,
+ )
+ self.test_findings.append(finding)
+
+ # Closed findings
+ for severity in ["High", "Medium"]:
+ finding = Finding.objects.create(
+ title=f"Closed Test Finding {severity}",
+ description=f"Closed test finding with {severity} severity",
+ severity=severity,
+ date=now.replace(day=10),
+ reporter=self.user1,
+ test_id=3,
+ verified=True,
+ active=False,
+ mitigated=now.replace(day=20),
+ mitigated_by=self.user1,
+ )
+ self.test_findings.append(finding)
+
+ def test_view_engineer_permission_denied_anonymous(self):
+ """Test that anonymous users cannot access view_engineer"""
+ response = self.client.get(reverse("view_engineer", args=[self.user1.id]))
+ self.assertEqual(response.status_code, 302)
+
+ def test_view_engineer_permission_denied_other_user(self):
+ """Test that regular users cannot view other users' metrics"""
+ self.client.force_login(self.user2)
+ response = self.client.get(reverse("view_engineer", args=[self.user1.id]))
+ # Django test client may return 400 or 403 for permission denied
+ self.assertIn(response.status_code, [400, 403])
+
+ def test_view_engineer_permission_allowed_self(self):
+ """Test that users can view their own metrics"""
+ self.client.force_login(self.user1)
+ response = self.client.get(reverse("view_engineer", args=[self.user1.id]))
+
+ self.assertEqual(response.status_code, 200)
+ self.assertContains(response, self.user1.get_full_name())
+
+ def test_view_engineer_permission_allowed_superuser(self):
+ """Test that superusers can view any user's metrics"""
+ self.client.force_login(self.superuser)
+ response = self.client.get(reverse("view_engineer", args=[self.user1.id]))
+
+ self.assertEqual(response.status_code, 200)
+ self.assertContains(response, self.user1.get_full_name())
+
+ @patch("django.utils.timezone.now")
+ def test_view_engineer_monthly_metrics_calculation(self, mock_now):
+ """Test that monthly metrics are calculated correctly"""
+ fixed_now = timezone.make_aware(datetime(2023, 6, 15, 10, 0, 0))
+ mock_now.return_value = fixed_now
+
+ # Create findings for this specific month
+ Finding.objects.create(
+ title="June Critical Finding",
+ severity="Critical",
+ date=fixed_now.replace(day=10),
+ reporter=self.user1,
+ test_id=3,
+ verified=True,
+ active=True,
+ )
+ Finding.objects.create(
+ title="June High Finding",
+ severity="High",
+ date=fixed_now.replace(day=12),
+ reporter=self.user1,
+ test_id=3,
+ verified=True,
+ active=True,
+ )
+
+ self.client.force_login(self.user1)
+ response = self.client.get(reverse("view_engineer", args=[self.user1.id]))
+ context = response.context
+
+ # Test actual counts, not just key existence
+ self.assertEqual(context["critical_open_month"], 1)
+ self.assertEqual(context["high_open_month"], 1)
+ # open_month is a QuerySet
+ self.assertGreaterEqual(context["open_month"].count(), 2)
+
+ @patch("django.utils.timezone.now")
+ def test_view_engineer_weekly_metrics_calculation(self, mock_now):
+ """Test that weekly metrics are calculated correctly"""
+ fixed_now = timezone.make_aware(datetime(2023, 6, 15, 10, 0, 0))
+ mock_now.return_value = fixed_now
+
+ # Create findings for this week
+ Finding.objects.create(
+ title="This Week Finding",
+ severity="High",
+ date=fixed_now.replace(day=12), # Same week
+ reporter=self.user1,
+ test_id=3,
+ verified=True,
+ active=True,
+ )
+
+ self.client.force_login(self.user1)
+ response = self.client.get(reverse("view_engineer", args=[self.user1.id]))
+ context = response.context
+
+ # Test that we have weekly data
+ if isinstance(context["open_week_count"], dict):
+ self.assertGreater(sum(context["open_week_count"].values()), 0)
+ else:
+ self.assertGreaterEqual(context["open_week_count"], 1)
+
+ # Test that open_week contains our test finding
+ self.assertGreater(context["open_week"].count(), 0)
+
+ @patch("django.utils.timezone.now")
+ def test_view_engineer_age_buckets_calculation(self, mock_now):
+ """Test age bucket calculations using DB aggregation"""
+ fixed_now = timezone.make_aware(datetime(2023, 6, 15, 10, 0, 0))
+ mock_now.return_value = fixed_now
+
+ # Create findings with specific ages
+ Finding.objects.create(
+ title="Recent Finding",
+ severity="High",
+ date=fixed_now - timedelta(days=15), # Less than 30 days
+ reporter=self.user1,
+ test_id=3,
+ verified=True,
+ active=True,
+ )
+ Finding.objects.create(
+ title="Old Finding",
+ severity="Medium",
+ date=fixed_now - timedelta(days=100), # More than 90 days
+ reporter=self.user1,
+ test_id=3,
+ verified=True,
+ active=True,
+ )
+
+ self.client.force_login(self.user1)
+ response = self.client.get(reverse("view_engineer", args=[self.user1.id]))
+ context = response.context
+
+ # Test actual age bucket counts
+ self.assertGreaterEqual(context["lt"], 1) # Recent finding
+ self.assertGreaterEqual(context["mn"], 1) # Old finding
+
+ # Verify they are integers from DB aggregation
+ self.assertIsInstance(context["lt"], int)
+ self.assertIsInstance(context["mn"], int)
+
+ @patch("django.utils.timezone.now")
+ def test_view_engineer_risk_acceptance_metrics(self, mock_now):
+ """Test risk acceptance handling in metrics"""
+ fixed_now = timezone.make_aware(datetime(2023, 6, 15, 10, 0, 0))
+ mock_now.return_value = fixed_now
+
+ # Create a finding and accept it
+ finding = Finding.objects.create(
+ title="Finding to Accept",
+ severity="High",
+ date=fixed_now,
+ reporter=self.user1,
+ test_id=3,
+ verified=True,
+ active=True,
+ )
+
+ risk_acceptance = Risk_Acceptance.objects.create(
+ name="Test Risk Acceptance",
+ recommendation="A",
+ decision="A",
+ owner=self.user1,
+ created=fixed_now,
+ )
+ risk_acceptance.accepted_findings.add(finding)
+
+ self.client.force_login(self.user1)
+ response = self.client.get(reverse("view_engineer", args=[self.user1.id]))
+ context = response.context
+
+ # Test that accepted findings are counted
+ self.assertGreaterEqual(context["high_a_month"], 1)
+ # a_month is a QuerySet
+ self.assertGreaterEqual(context["a_month"].count(), 1)
+
+ def test_view_engineer_chart_data_structure(self):
+ """Test chart data generation has correct structure"""
+ self.client.force_login(self.user1)
+ response = self.client.get(reverse("view_engineer", args=[self.user1.id]))
+ context = response.context
+
+ chart_data = context["chart_data"]
+ self.assertIsInstance(chart_data, list)
+
+ if len(chart_data) > 0:
+ header = chart_data[0]
+ self.assertEqual(header, ["Date", "S0", "S1", "S2", "S3", "Total"])
+
+ def test_view_engineer_user_not_found(self):
+ """Test handling of non-existent user ID"""
+ self.client.force_login(self.superuser)
+ response = self.client.get(reverse("view_engineer", args=[99999]))
+ self.assertEqual(response.status_code, 404)
+
+ @patch("django.utils.timezone.now")
+ def test_view_engineer_empty_data(self, mock_now):
+ """Test view behavior with user who has no findings"""
+ fixed_now = timezone.make_aware(datetime(2023, 6, 15, 10, 0, 0))
+ mock_now.return_value = fixed_now
+
+ empty_user = User.objects.create(
+ username="empty_user",
+ email="empty@example.com",
+ first_name="Empty",
+ last_name="User",
+ )
+
+ self.client.force_login(self.superuser)
+ response = self.client.get(reverse("view_engineer", args=[empty_user.id]))
+ context = response.context
+
+ # Should have zero values, not missing keys
+ self.assertEqual(context["critical_open_month"], 0)
+ self.assertEqual(context["high_open_month"], 0)
+ self.assertEqual(context["lt"], 0)
+ self.assertEqual(context["mn"], 0)
+
+ def test_view_engineer_context_completeness(self):
+ """Test that all expected context variables are present"""
+ self.client.force_login(self.user1)
+ response = self.client.get(reverse("view_engineer", args=[self.user1.id]))
+ context = response.context
+
+ # Test critical context keys exist
+ required_keys = [
+ "open_month", "a_month", "closed_month",
+ "critical_open_month", "high_open_month", "medium_open_month", "low_open_month",
+ "open_week_count", "closed_week_count",
+ "lt", "ls", "ln", "mn",
+ "chart_data", "name", "user",
+ ]
+
+ for key in required_keys:
+ self.assertIn(key, context, f"Missing required context key: {key}")