diff --git a/dojo/metrics/views.py b/dojo/metrics/views.py index 3efdb82a969..24c68381806 100644 --- a/dojo/metrics/views.py +++ b/dojo/metrics/views.py @@ -3,15 +3,12 @@ import logging import operator from calendar import monthrange -from collections import OrderedDict from datetime import date, datetime, timedelta -from functools import reduce -from operator import itemgetter from dateutil.relativedelta import relativedelta from django.contrib import messages from django.core.exceptions import PermissionDenied -from django.db.models import Case, Count, IntegerField, Q, Sum, Value, When +from django.db.models import Case, Count, F, IntegerField, Q, Sum, Value, When from django.http import HttpResponseRedirect from django.shortcuts import get_object_or_404, render from django.urls import reverse @@ -35,7 +32,7 @@ identify_view, severity_count, ) -from dojo.models import Dojo_User, Engagement, Finding, Product, Product_Type, Risk_Acceptance, Test +from dojo.models import Dojo_User, Finding, Product, Product_Type, Risk_Acceptance from dojo.product.queries import get_authorized_products from dojo.product_type.queries import get_authorized_product_types from dojo.utils import ( @@ -680,307 +677,255 @@ def engineer_metrics(request): """ -# noinspection DjangoOrm -@cache_page(60 * 5) # cache for 5 minutes @vary_on_cookie def view_engineer(request, eid): user = get_object_or_404(Dojo_User, pk=eid) - if not (request.user.is_superuser - or request.user.username == user.username): + if not (request.user.is_superuser or request.user.username == user.username): raise PermissionDenied + now = timezone.now() + tz = now.tzinfo - if get_system_setting("enforce_verified_status", True) or get_system_setting("enforce_verified_status_metrics", True): - findings = Finding.objects.filter(reporter=user, verified=True) - else: - findings = Finding.objects.filter(reporter=user) + # --------------- + # Base query-sets + reporter_findings = Finding.objects.filter(reporter=user) + if get_system_setting("enforce_verified_status", True) or get_system_setting( + "enforce_verified_status_metrics", True, + ): + reporter_findings = reporter_findings.filter(verified=True) closed_findings = Finding.objects.filter(mitigated_by=user) - open_findings = findings.exclude(mitigated__isnull=False) - open_month = findings.filter(date__year=now.year, date__month=now.month) - accepted_month = [finding for ra in Risk_Acceptance.objects.filter( - created__range=[datetime(now.year, - now.month, 1, - tzinfo=timezone.get_current_timezone()), - datetime(now.year, - now.month, - monthrange(now.year, - now.month)[1], - tzinfo=timezone.get_current_timezone())], - owner=user) - for finding in ra.accepted_findings.all()] - closed_month = [f for f in closed_findings - if f.mitigated and f.mitigated.year == now.year and f.mitigated.month == now.month] + open_findings = ( + reporter_findings.filter(mitigated__isnull=True) + .select_related("test__engagement__product__prod_type", "reporter") + .prefetch_related("risk_acceptance_set") + ) + + # -------------------- + # Month & week buckets + month_start = datetime(now.year, now.month, 1, tzinfo=tz) + month_end = month_start + relativedelta(months=1) # first day of next month (exclusive) + + open_month = reporter_findings.filter(date__gte=month_start, date__lt=month_end) + closed_month = closed_findings.filter(mitigated__gte=month_start, mitigated__lt=month_end) + accepted_month = ( + Finding.objects.filter( + risk_acceptance__owner=user, + risk_acceptance__created__gte=month_start, + risk_acceptance__created__lt=month_end, + ).distinct() + ) + + week_start = (now - timedelta(days=now.weekday())).replace(hour=0, minute=0, second=0, microsecond=0) + week_end = week_start + timedelta(days=7) # next Monday 00:00 (exclusive) + open_week = reporter_findings.filter(date__gte=week_start, date__lt=week_end) + closed_week = closed_findings.filter(mitigated__gte=week_start, mitigated__lt=week_end) + accepted_week = ( + Finding.objects.filter( + risk_acceptance__owner=user, + risk_acceptance__created__gte=week_start, + risk_acceptance__created__lt=week_end, + ).distinct() + ) o_dict, open_count = count_findings(open_month) c_dict, closed_count = count_findings(closed_month) a_dict, accepted_count = count_findings(accepted_month) - day_list = [now - relativedelta(weeks=1, - weekday=x, - hour=0, - minute=0, - second=0) - for x in range(now.weekday())] - day_list.append(now) - - q_objects = (Q(date=d) for d in day_list) - open_week = findings.filter(reduce(operator.or_, q_objects)) - - accepted_week = [finding for ra in Risk_Acceptance.objects.filter( - owner=user, created__range=[day_list[0], day_list[-1]]) - for finding in ra.accepted_findings.all()] - - q_objects = (Q(mitigated=d) for d in day_list) - # closed_week= findings.filter(reduce(operator.or_, q_objects)) - closed_week = [f for f in closed_findings if f.mitigated and f.mitigated >= day_list[0]] - o_week_dict, open_week_count = count_findings(open_week) c_week_dict, closed_week_count = count_findings(closed_week) a_week_dict, accepted_week_count = count_findings(accepted_week) - stuff = [] - o_stuff = [] - a_stuff = [] - findings_this_period(findings, 1, stuff, o_stuff, a_stuff) - # findings_this_period no longer fits the need for accepted findings - # however will use its week finding output to use here - for month in a_stuff: - month_start = datetime.strptime( - month[0].strip(), "%b %Y") - month_end = datetime(month_start.year, - month_start.month, - monthrange( - month_start.year, - month_start.month)[1], - tzinfo=timezone.get_current_timezone()) - for finding in [finding for ra in Risk_Acceptance.objects.filter( - created__range=[month_start, month_end], owner=user) - for finding in ra.accepted_findings.all()]: - if finding.severity == "Critical": - month[1] += 1 - if finding.severity == "High": - month[2] += 1 - if finding.severity == "Medium": - month[3] += 1 - if finding.severity == "Low": - month[4] += 1 - - month[5] = sum(month[1:]) - week_stuff = [] - week_o_stuff = [] - week_a_stuff = [] - findings_this_period(findings, 0, week_stuff, week_o_stuff, week_a_stuff) - - # findings_this_period no longer fits the need for accepted findings - # however will use its week finding output to use here - for week in week_a_stuff: - wk_range = week[0].split("-") - week_start = datetime.strptime( - wk_range[0].strip() + " " + str(now.year), "%b %d %Y") - week_end = datetime.strptime( - wk_range[1].strip() + " " + str(now.year), "%b %d %Y") - - for finding in [finding for ra in Risk_Acceptance.objects.filter( - created__range=[week_start, week_end], owner=user) - for finding in ra.accepted_findings.all()]: - if finding.severity == "Critical": - week[1] += 1 - if finding.severity == "High": - week[2] += 1 - if finding.severity == "Medium": - week[3] += 1 - if finding.severity == "Low": - week[4] += 1 - - week[5] = sum(week[1:]) - - products = get_authorized_products(Permissions.Product_Type_View) - vulns = {} - for product in products: - f_count = 0 - engs = Engagement.objects.filter(product=product) - for eng in engs: - tests = Test.objects.filter(engagement=eng) - for test in tests: - f_count += findings.filter(test=test, - mitigated__isnull=True, - active=True).count() - vulns[product.id] = f_count - od = OrderedDict(sorted(vulns.items(), key=itemgetter(1))) - items = list(od.items()) - items.reverse() - top = items[: 10] - update = [] - for t in top: - product = t[0] - z_count = 0 - o_count = 0 - t_count = 0 - h_count = 0 - engs = Engagement.objects.filter( - product=Product.objects.get(id=product)) - for eng in engs: - tests = Test.objects.filter(engagement=eng) - for test in tests: - z_count += findings.filter( - test=test, - mitigated__isnull=True, - severity="Critical", - ).count() - o_count += findings.filter( - test=test, - mitigated__isnull=True, - severity="High", - ).count() - t_count += findings.filter( - test=test, - mitigated__isnull=True, - severity="Medium", - ).count() - h_count += findings.filter( - test=test, - mitigated__isnull=True, - severity="Low", - ).count() - prod = Product.objects.get(id=product) - all_findings_link = "{}".format( - reverse("product_open_findings", args=(prod.id,)), escape(prod.name)) - update.append([all_findings_link, z_count, o_count, t_count, h_count, - z_count + o_count + t_count + h_count]) - total_update = [] - for i in items: - product = i[0] - z_count = 0 - o_count = 0 - t_count = 0 - h_count = 0 - engs = Engagement.objects.filter( - product=Product.objects.get(id=product)) - for eng in engs: - tests = Test.objects.filter(engagement=eng) - for test in tests: - z_count += findings.filter( - test=test, - mitigated__isnull=True, - severity="Critical").count() - o_count += findings.filter( - test=test, - mitigated__isnull=True, - severity="High").count() - t_count += findings.filter( - test=test, - mitigated__isnull=True, - severity="Medium").count() - h_count += findings.filter( - test=test, - mitigated__isnull=True, - severity="Low").count() - prod = Product.objects.get(id=product) - all_findings_link = "{}".format( - reverse("product_open_findings", args=(prod.id,)), escape(prod.name)) - total_update.append([all_findings_link, z_count, o_count, t_count, - h_count, z_count + o_count + t_count + h_count]) - - neg_length = len(stuff) - findz = findings.filter(mitigated__isnull=True, active=True, - risk_acceptance=None) - findz = findz.filter(Q(severity="Critical") | Q(severity="High")) - less_thirty = 0 - less_sixty = 0 - less_nine = 0 - more_nine = 0 - for finding in findz: - elapsed = date.today() - finding.date - if elapsed <= timedelta(days=30): - less_thirty += 1 - elif elapsed <= timedelta(days=60): - less_sixty += 1 - elif elapsed <= timedelta(days=90): - less_nine += 1 - else: - more_nine += 1 - - # Data for the monthly charts - chart_data = [["Date", "S0", "S1", "S2", "S3", "Total"]] - for thing in o_stuff: - chart_data.insert(1, thing) - - a_chart_data = [["Date", "S0", "S1", "S2", "S3", "Total"]] - for thing in a_stuff: - a_chart_data.insert(1, thing) - - # Data for the weekly charts - week_chart_data = [["Date", "S0", "S1", "S2", "S3", "Total"]] - for thing in week_o_stuff: - week_chart_data.insert(1, thing) - - week_a_chart_data = [["Date", "S0", "S1", "S2", "S3", "Total"]] - for thing in week_a_stuff: - week_a_chart_data.insert(1, thing) - - details = [] - for find in open_findings: - team = find.test.engagement.product.prod_type.name - name = find.test.engagement.product.name - severity = find.severity - description = find.title - life = date.today() - find.date - life = life.days - status = "Active" - if find.risk_accepted: - status = "Accepted" - detail = [team, name, severity, description, life, status, find.reporter] - details.append(detail) - - details = sorted(details, key=itemgetter(2)) + # -------------------------- + # Historic series for charts + monthly_total_series, monthly_open_series, monthly_accepted_series = [], [], [] + findings_this_period(reporter_findings, 1, monthly_total_series, monthly_open_series, monthly_accepted_series) + + weekly_total_series, weekly_open_series, weekly_accepted_series = [], [], [] + findings_this_period(reporter_findings, 0, weekly_total_series, weekly_open_series, weekly_accepted_series) + + ras_owner_qs = Risk_Acceptance.objects.filter(owner=user) + _augment_series_with_accepted(monthly_accepted_series, ras_owner_qs, period="month", tz=tz) + _augment_series_with_accepted(weekly_accepted_series, ras_owner_qs, period="week", tz=tz) + + chart_data = [["Date", "S0", "S1", "S2", "S3", "Total"], *monthly_open_series] + a_chart_data = [["Date", "S0", "S1", "S2", "S3", "Total"], *monthly_accepted_series] + week_chart_data = [["Date", "S0", "S1", "S2", "S3", "Total"], *weekly_open_series] + week_a_chart_data = [["Date", "S0", "S1", "S2", "S3", "Total"], *weekly_accepted_series] + + # -------------- + # Product tables + products = list(get_authorized_products(Permissions.Product_Type_View).only("id", "name")) + update, total_update = _product_stats(products) + + # ---------------------------------- + # Age buckets for open critical/high + high_crit_open = reporter_findings.filter( + mitigated__isnull=True, + active=True, + risk_acceptance=None, + severity__in=["Critical", "High"], + ) + age_buckets = _age_buckets(high_crit_open) + + # ------------- + # Details table + details = sorted( + ( + [ + f.test.engagement.product.prod_type.name, + f.test.engagement.product.name, + f.severity, + f.title, + (date.today() - f.date).days, + "Accepted" if f.risk_accepted else "Active", + f.reporter, + ] + for f in open_findings + ), + key=operator.itemgetter(2), + ) add_breadcrumb(title=f"{user.get_full_name()} Metrics", top_level=False, request=request) - return render(request, "dojo/view_engineer.html", { - "open_month": open_month, - "a_month": accepted_month, - "low_a_month": accepted_count["low"], - "medium_a_month": accepted_count["med"], - "high_a_month": accepted_count["high"], - "critical_a_month": accepted_count["crit"], - "closed_month": closed_month, - "low_open_month": open_count["low"], - "medium_open_month": open_count["med"], - "high_open_month": open_count["high"], - "critical_open_month": open_count["crit"], - "low_c_month": closed_count["low"], - "medium_c_month": closed_count["med"], - "high_c_month": closed_count["high"], - "critical_c_month": closed_count["crit"], - "week_stuff": week_stuff, - "week_a_stuff": week_a_stuff, - "a_total": a_stuff, - "total": stuff, - "sub": neg_length, - "update": update, - "lt": less_thirty, - "ls": less_sixty, - "ln": less_nine, - "mn": more_nine, - "chart_data": chart_data, - "a_chart_data": a_chart_data, - "week_chart_data": week_chart_data, - "week_a_chart_data": week_a_chart_data, - "name": f"{user.get_full_name()} Metrics", - "metric": True, - "total_update": total_update, - "details": details, - "open_week": open_week, - "closed_week": closed_week, - "accepted_week": accepted_week, - "a_dict": a_dict, - "o_dict": o_dict, - "c_dict": c_dict, - "o_week_dict": o_week_dict, - "a_week_dict": a_week_dict, - "c_week_dict": c_week_dict, - "open_week_count": open_week_count, - "accepted_week_count": accepted_week_count, - "closed_week_count": closed_week_count, - "user": request.user, - }) + return render( + request, + "dojo/view_engineer.html", + { + # month + "open_month": open_month, + "a_month": accepted_month, + "low_a_month": accepted_count["low"], + "medium_a_month": accepted_count["med"], + "high_a_month": accepted_count["high"], + "critical_a_month": accepted_count["crit"], + "closed_month": closed_month, + "low_open_month": open_count["low"], + "medium_open_month": open_count["med"], + "high_open_month": open_count["high"], + "critical_open_month": open_count["crit"], + "low_c_month": closed_count["low"], + "medium_c_month": closed_count["med"], + "high_c_month": closed_count["high"], + "critical_c_month": closed_count["crit"], + # week + "week_stuff": weekly_total_series, + "week_a_stuff": weekly_accepted_series, + # series + "a_total": monthly_accepted_series, + "total": monthly_total_series, + "sub": len(monthly_total_series), + # product tables + "update": update, + "total_update": total_update, + # aged buckets + "lt": age_buckets["lt"], + "ls": age_buckets["ls"], + "ln": age_buckets["ln"], + "mn": age_buckets["mn"], + # charts + "chart_data": chart_data, + "a_chart_data": a_chart_data, + "week_chart_data": week_chart_data, + "week_a_chart_data": week_a_chart_data, + # misc + "name": f"{user.get_full_name()} Metrics", + "metric": True, + "details": details, + "open_week": open_week, + "closed_week": closed_week, + "accepted_week": accepted_week, + "a_dict": a_dict, + "o_dict": o_dict, + "c_dict": c_dict, + "o_week_dict": o_week_dict, + "a_week_dict": a_week_dict, + "c_week_dict": c_week_dict, + "open_week_count": open_week_count, + "accepted_week_count": accepted_week_count, + "closed_week_count": closed_week_count, + "user": request.user, + }, + ) + + +def _age_buckets(qs): + """Return aged high/critical finding counts in one SQL round-trip.""" + today = date.today() + return qs.aggregate( + lt=Count("id", filter=Q(date__gte=today - timedelta(days=30))), + ls=Count("id", filter=Q(date__lte=today - timedelta(days=30), date__gt=today - timedelta(days=60))), + ln=Count("id", filter=Q(date__lte=today - timedelta(days=60), date__gt=today - timedelta(days=90))), + mn=Count("id", filter=Q(date__lte=today - timedelta(days=90))), + ) + + +def _augment_series_with_accepted(series: list[list], ras_qs, *, period: str, tz): + """Mutate `series` in-place, adding per-severity counts for accepted findings.""" + if not series: # no buckets to augment + return + + first_ra = ras_qs.first() + if first_ra is None: # engineer has no risk acceptances at all + return + + owner = first_ra.owner + sev_idx = {"Critical": 1, "High": 2, "Medium": 3, "Low": 4} + + for bucket in series: + if period == "month": + start = datetime.strptime(bucket[0].strip(), "%b %Y").replace(tzinfo=tz) + end = start + relativedelta(months=1) # first day of next month (exclusive) + else: # "week" + wk_a, _ = (d.strip() for d in bucket[0].split("-")) + year = timezone.now().year + start = datetime.strptime(f"{wk_a} {year}", "%b %d %Y").replace(tzinfo=tz) + end = start + timedelta(days=7) # next Monday 00:00 (exclusive) + + accepted = ( + Finding.objects.filter( + risk_acceptance__owner=owner, + risk_acceptance__created__gte=start, + risk_acceptance__created__lt=end, + ) + .values("severity") + .annotate(cnt=Count("id")) + ) + + for row in accepted: + bucket[sev_idx[row["severity"]]] += row["cnt"] + + bucket[5] = sum(bucket[1:]) + + +def _product_stats(products) -> tuple[list, list]: + """ + Return two tables: + * `update` - top-10 products by open findings + * `total_update` - all authorized products + """ + counts = ( + Finding.objects.filter(test__engagement__product__in=products, mitigated__isnull=True, active=True) + .values(pid=F("test__engagement__product")) + .annotate( + critical=Count("id", filter=Q(severity="Critical")), + high=Count("id", filter=Q(severity="High")), + medium=Count("id", filter=Q(severity="Medium")), + low=Count("id", filter=Q(severity="Low")), + total=Count("id"), + ) + ) + by_id = {c["pid"]: c for c in counts} + top10 = sorted(by_id.items(), key=lambda kv: kv[1]["total"], reverse=True)[:10] + + product_lookup = {p.id: p for p in products} + + def row(prod_id): + prod = product_lookup[prod_id] + link = f"{escape(prod.name)}" + data = by_id[prod_id] + return [link, data["critical"], data["high"], data["medium"], data["low"], data["total"]] + + update = [row(pid) for pid, _ in top10] + total_update = [row(p.id) for p in products if p.id in by_id] + + return update, total_update diff --git a/dojo/utils.py b/dojo/utils.py index b6fba9a5eb9..e3b89954cf7 100644 --- a/dojo/utils.py +++ b/dojo/utils.py @@ -30,7 +30,7 @@ from django.contrib.auth.signals import user_logged_in, user_logged_out, user_login_failed from django.contrib.contenttypes.models import ContentType from django.core.paginator import Paginator -from django.db.models import Case, Count, IntegerField, Q, Subquery, Sum, Value, When +from django.db.models import Case, Count, F, IntegerField, Q, Subquery, Sum, Value, When from django.db.models.query import QuerySet from django.db.models.signals import post_save from django.dispatch import receiver @@ -578,40 +578,31 @@ def set_duplicate_reopen(new_finding, existing_finding): existing_finding.save() -def count_findings(findings): - product_count = {} - finding_count = {"low": 0, "med": 0, "high": 0, "crit": 0} - for f in findings: - product = f.test.engagement.product - if product in product_count: - product_count[product][4] += 1 - if f.severity == "Low": - product_count[product][3] += 1 - finding_count["low"] += 1 - if f.severity == "Medium": - product_count[product][2] += 1 - finding_count["med"] += 1 - if f.severity == "High": - product_count[product][1] += 1 - finding_count["high"] += 1 - if f.severity == "Critical": - product_count[product][0] += 1 - finding_count["crit"] += 1 - else: - product_count[product] = [0, 0, 0, 0, 0] - product_count[product][4] += 1 - if f.severity == "Low": - product_count[product][3] += 1 - finding_count["low"] += 1 - if f.severity == "Medium": - product_count[product][2] += 1 - finding_count["med"] += 1 - if f.severity == "High": - product_count[product][1] += 1 - finding_count["high"] += 1 - if f.severity == "Critical": - product_count[product][0] += 1 - finding_count["crit"] += 1 +def count_findings(findings: QuerySet) -> tuple[dict["Product", list[int]], dict[str, int]]: + agg = ( + findings.values(prod_id=F("test__engagement__product_id")) + .annotate( + crit=Count("id", filter=Q(severity="Critical")), + high=Count("id", filter=Q(severity="High")), + med=Count("id", filter=Q(severity="Medium")), + low=Count("id", filter=Q(severity="Low")), + total=Count("id"), + ) + ) + rows = list(agg) + + from dojo.models import Product # imported lazily to avoid circulars + + products = Product.objects.in_bulk([r["prod_id"] for r in rows]) + product_count = { + products[r["prod_id"]]: [r["crit"], r["high"], r["med"], r["low"], r["total"]] for r in rows + } + finding_count = { + "low": sum(r["low"] for r in rows), + "med": sum(r["med"] for r in rows), + "high": sum(r["high"] for r in rows), + "crit": sum(r["crit"] for r in rows), + } return product_count, finding_count diff --git a/unittests/test_view_engineer_metrics.py b/unittests/test_view_engineer_metrics.py new file mode 100644 index 00000000000..fb0d9c3a3f5 --- /dev/null +++ b/unittests/test_view_engineer_metrics.py @@ -0,0 +1,308 @@ +"""Tests for the optimized view_engineer metrics endpoint""" + +from datetime import datetime, timedelta +from unittest.mock import patch + +from django.test import RequestFactory, override_settings +from django.urls import reverse +from django.utils import timezone + +from dojo.models import Finding, Risk_Acceptance, User + +from .dojo_test_case import DojoTestCase + + +@override_settings(CELERY_TASK_ALWAYS_EAGER=True) +class ViewEngineerMetricsTest(DojoTestCase): + + """Test suite for the optimized view_engineer endpoint""" + + fixtures = ["dojo_testdata.json"] + + def setUp(self): + """Set up test data and common objects""" + self.factory = RequestFactory() + self.user1 = User.objects.get(username="user1") + self.user2 = User.objects.get(username="user2") + self.superuser = User.objects.get(username="admin") + + self.test_findings = [] + self.create_test_findings() + + def create_test_findings(self): + """Create test findings with different severities and dates""" + now = timezone.now() + + # Current month findings + for severity in ["Critical", "High", "Medium", "Low"]: + finding = Finding.objects.create( + title=f"Test Finding {severity}", + description=f"Test finding with {severity} severity", + severity=severity, + date=now.replace(day=15), + reporter=self.user1, + test_id=3, + verified=True, + active=True, + mitigated=None, + ) + self.test_findings.append(finding) + + # Previous month findings + prev_month = now - timedelta(days=30) + for severity in ["Critical", "High"]: + finding = Finding.objects.create( + title=f"Old Test Finding {severity}", + description=f"Old test finding with {severity} severity", + severity=severity, + date=prev_month, + reporter=self.user1, + test_id=3, + verified=True, + active=True, + mitigated=None, + ) + self.test_findings.append(finding) + + # Closed findings + for severity in ["High", "Medium"]: + finding = Finding.objects.create( + title=f"Closed Test Finding {severity}", + description=f"Closed test finding with {severity} severity", + severity=severity, + date=now.replace(day=10), + reporter=self.user1, + test_id=3, + verified=True, + active=False, + mitigated=now.replace(day=20), + mitigated_by=self.user1, + ) + self.test_findings.append(finding) + + def test_view_engineer_permission_denied_anonymous(self): + """Test that anonymous users cannot access view_engineer""" + response = self.client.get(reverse("view_engineer", args=[self.user1.id])) + self.assertEqual(response.status_code, 302) + + def test_view_engineer_permission_denied_other_user(self): + """Test that regular users cannot view other users' metrics""" + self.client.force_login(self.user2) + response = self.client.get(reverse("view_engineer", args=[self.user1.id])) + # Django test client may return 400 or 403 for permission denied + self.assertIn(response.status_code, [400, 403]) + + def test_view_engineer_permission_allowed_self(self): + """Test that users can view their own metrics""" + self.client.force_login(self.user1) + response = self.client.get(reverse("view_engineer", args=[self.user1.id])) + + self.assertEqual(response.status_code, 200) + self.assertContains(response, self.user1.get_full_name()) + + def test_view_engineer_permission_allowed_superuser(self): + """Test that superusers can view any user's metrics""" + self.client.force_login(self.superuser) + response = self.client.get(reverse("view_engineer", args=[self.user1.id])) + + self.assertEqual(response.status_code, 200) + self.assertContains(response, self.user1.get_full_name()) + + @patch("django.utils.timezone.now") + def test_view_engineer_monthly_metrics_calculation(self, mock_now): + """Test that monthly metrics are calculated correctly""" + fixed_now = timezone.make_aware(datetime(2023, 6, 15, 10, 0, 0)) + mock_now.return_value = fixed_now + + # Create findings for this specific month + Finding.objects.create( + title="June Critical Finding", + severity="Critical", + date=fixed_now.replace(day=10), + reporter=self.user1, + test_id=3, + verified=True, + active=True, + ) + Finding.objects.create( + title="June High Finding", + severity="High", + date=fixed_now.replace(day=12), + reporter=self.user1, + test_id=3, + verified=True, + active=True, + ) + + self.client.force_login(self.user1) + response = self.client.get(reverse("view_engineer", args=[self.user1.id])) + context = response.context + + # Test actual counts, not just key existence + self.assertEqual(context["critical_open_month"], 1) + self.assertEqual(context["high_open_month"], 1) + # open_month is a QuerySet + self.assertGreaterEqual(context["open_month"].count(), 2) + + @patch("django.utils.timezone.now") + def test_view_engineer_weekly_metrics_calculation(self, mock_now): + """Test that weekly metrics are calculated correctly""" + fixed_now = timezone.make_aware(datetime(2023, 6, 15, 10, 0, 0)) + mock_now.return_value = fixed_now + + # Create findings for this week + Finding.objects.create( + title="This Week Finding", + severity="High", + date=fixed_now.replace(day=12), # Same week + reporter=self.user1, + test_id=3, + verified=True, + active=True, + ) + + self.client.force_login(self.user1) + response = self.client.get(reverse("view_engineer", args=[self.user1.id])) + context = response.context + + # Test that we have weekly data + if isinstance(context["open_week_count"], dict): + self.assertGreater(sum(context["open_week_count"].values()), 0) + else: + self.assertGreaterEqual(context["open_week_count"], 1) + + # Test that open_week contains our test finding + self.assertGreater(context["open_week"].count(), 0) + + @patch("django.utils.timezone.now") + def test_view_engineer_age_buckets_calculation(self, mock_now): + """Test age bucket calculations using DB aggregation""" + fixed_now = timezone.make_aware(datetime(2023, 6, 15, 10, 0, 0)) + mock_now.return_value = fixed_now + + # Create findings with specific ages + Finding.objects.create( + title="Recent Finding", + severity="High", + date=fixed_now - timedelta(days=15), # Less than 30 days + reporter=self.user1, + test_id=3, + verified=True, + active=True, + ) + Finding.objects.create( + title="Old Finding", + severity="Medium", + date=fixed_now - timedelta(days=100), # More than 90 days + reporter=self.user1, + test_id=3, + verified=True, + active=True, + ) + + self.client.force_login(self.user1) + response = self.client.get(reverse("view_engineer", args=[self.user1.id])) + context = response.context + + # Test actual age bucket counts + self.assertGreaterEqual(context["lt"], 1) # Recent finding + self.assertGreaterEqual(context["mn"], 1) # Old finding + + # Verify they are integers from DB aggregation + self.assertIsInstance(context["lt"], int) + self.assertIsInstance(context["mn"], int) + + @patch("django.utils.timezone.now") + def test_view_engineer_risk_acceptance_metrics(self, mock_now): + """Test risk acceptance handling in metrics""" + fixed_now = timezone.make_aware(datetime(2023, 6, 15, 10, 0, 0)) + mock_now.return_value = fixed_now + + # Create a finding and accept it + finding = Finding.objects.create( + title="Finding to Accept", + severity="High", + date=fixed_now, + reporter=self.user1, + test_id=3, + verified=True, + active=True, + ) + + risk_acceptance = Risk_Acceptance.objects.create( + name="Test Risk Acceptance", + recommendation="A", + decision="A", + owner=self.user1, + created=fixed_now, + ) + risk_acceptance.accepted_findings.add(finding) + + self.client.force_login(self.user1) + response = self.client.get(reverse("view_engineer", args=[self.user1.id])) + context = response.context + + # Test that accepted findings are counted + self.assertGreaterEqual(context["high_a_month"], 1) + # a_month is a QuerySet + self.assertGreaterEqual(context["a_month"].count(), 1) + + def test_view_engineer_chart_data_structure(self): + """Test chart data generation has correct structure""" + self.client.force_login(self.user1) + response = self.client.get(reverse("view_engineer", args=[self.user1.id])) + context = response.context + + chart_data = context["chart_data"] + self.assertIsInstance(chart_data, list) + + if len(chart_data) > 0: + header = chart_data[0] + self.assertEqual(header, ["Date", "S0", "S1", "S2", "S3", "Total"]) + + def test_view_engineer_user_not_found(self): + """Test handling of non-existent user ID""" + self.client.force_login(self.superuser) + response = self.client.get(reverse("view_engineer", args=[99999])) + self.assertEqual(response.status_code, 404) + + @patch("django.utils.timezone.now") + def test_view_engineer_empty_data(self, mock_now): + """Test view behavior with user who has no findings""" + fixed_now = timezone.make_aware(datetime(2023, 6, 15, 10, 0, 0)) + mock_now.return_value = fixed_now + + empty_user = User.objects.create( + username="empty_user", + email="empty@example.com", + first_name="Empty", + last_name="User", + ) + + self.client.force_login(self.superuser) + response = self.client.get(reverse("view_engineer", args=[empty_user.id])) + context = response.context + + # Should have zero values, not missing keys + self.assertEqual(context["critical_open_month"], 0) + self.assertEqual(context["high_open_month"], 0) + self.assertEqual(context["lt"], 0) + self.assertEqual(context["mn"], 0) + + def test_view_engineer_context_completeness(self): + """Test that all expected context variables are present""" + self.client.force_login(self.user1) + response = self.client.get(reverse("view_engineer", args=[self.user1.id])) + context = response.context + + # Test critical context keys exist + required_keys = [ + "open_month", "a_month", "closed_month", + "critical_open_month", "high_open_month", "medium_open_month", "low_open_month", + "open_week_count", "closed_week_count", + "lt", "ls", "ln", "mn", + "chart_data", "name", "user", + ] + + for key in required_keys: + self.assertIn(key, context, f"Missing required context key: {key}")