|
11 | 11 | from django.contrib.auth.models import Permission
|
12 | 12 | from django.core.exceptions import ValidationError
|
13 | 13 | from django.db import IntegrityError
|
| 14 | +from django.db.models import Count, Q |
14 | 15 | from django.http import FileResponse, Http404, HttpResponse
|
15 | 16 | from django.shortcuts import get_object_or_404
|
16 | 17 | from django.utils import timezone
|
@@ -3180,6 +3181,178 @@ def get_queryset(self):
|
3180 | 3181 | return Answered_Survey.objects.all().order_by("id")
|
3181 | 3182 |
|
3182 | 3183 |
|
| 3184 | +# Authorization: object-based (consistent with UI) |
| 3185 | +class SimpleMetricsViewSet( |
| 3186 | + viewsets.ReadOnlyModelViewSet, |
| 3187 | +): |
| 3188 | + |
| 3189 | + """ |
| 3190 | + Simple metrics API endpoint that provides finding counts by product type |
| 3191 | + broken down by severity and month status. |
| 3192 | +
|
| 3193 | + This endpoint replicates the logic from the UI's /metrics/simple endpoint |
| 3194 | + and uses the same authorization model for consistency. |
| 3195 | + """ |
| 3196 | + |
| 3197 | + serializer_class = serializers.SimpleMetricsSerializer |
| 3198 | + queryset = Product_Type.objects.none() # Required for consistent auth behavior |
| 3199 | + permission_classes = (IsAuthenticated,) # Match pattern used by RoleViewSet |
| 3200 | + pagination_class = None |
| 3201 | + |
| 3202 | + @extend_schema( |
| 3203 | + parameters=[ |
| 3204 | + OpenApiParameter( |
| 3205 | + "date", |
| 3206 | + OpenApiTypes.DATE, |
| 3207 | + OpenApiParameter.QUERY, |
| 3208 | + required=False, |
| 3209 | + description="Date to generate metrics for (YYYY-MM-DD format). Defaults to current month.", |
| 3210 | + ), |
| 3211 | + OpenApiParameter( |
| 3212 | + "product_type_id", |
| 3213 | + OpenApiTypes.INT, |
| 3214 | + OpenApiParameter.QUERY, |
| 3215 | + required=False, |
| 3216 | + description="Optional product type ID to filter metrics. If not provided, returns all accessible product types.", |
| 3217 | + ), |
| 3218 | + ], |
| 3219 | + responses={status.HTTP_200_OK: serializers.SimpleMetricsSerializer(many=True)}, |
| 3220 | + ) |
| 3221 | + def list(self, request): |
| 3222 | + """ |
| 3223 | + Retrieve simple metrics data for the requested month grouped by product type. |
| 3224 | +
|
| 3225 | + This endpoint replicates the logic from the UI's /metrics/simple endpoint |
| 3226 | + and uses the same authorization model for consistency. |
| 3227 | +
|
| 3228 | + Performance optimized with database aggregation instead of Python loops. |
| 3229 | + """ |
| 3230 | + # Parse the date parameter, default to current month (same as UI) |
| 3231 | + now = timezone.now() |
| 3232 | + date_param = request.query_params.get("date") |
| 3233 | + product_type_id = request.query_params.get("product_type_id") |
| 3234 | + |
| 3235 | + if date_param: |
| 3236 | + # Enhanced input validation while maintaining consistency with UI behavior |
| 3237 | + if len(date_param) > 20: |
| 3238 | + return Response( |
| 3239 | + {"error": "Invalid date parameter length."}, |
| 3240 | + status=status.HTTP_400_BAD_REQUEST, |
| 3241 | + ) |
| 3242 | + |
| 3243 | + # Sanitize input - only allow alphanumeric characters and hyphens |
| 3244 | + import re |
| 3245 | + if not re.match(r"^[0-9\-]+$", date_param): |
| 3246 | + return Response( |
| 3247 | + {"error": "Invalid date format. Only numbers and hyphens allowed."}, |
| 3248 | + status=status.HTTP_400_BAD_REQUEST, |
| 3249 | + ) |
| 3250 | + |
| 3251 | + try: |
| 3252 | + # Parse date string with validation |
| 3253 | + parsed_date = datetime.strptime(date_param, "%Y-%m-%d") |
| 3254 | + |
| 3255 | + # Reasonable date range validation |
| 3256 | + min_date = datetime(2000, 1, 1) |
| 3257 | + max_date = datetime.now() + relativedelta(years=1) |
| 3258 | + |
| 3259 | + if parsed_date < min_date or parsed_date > max_date: |
| 3260 | + return Response( |
| 3261 | + {"error": "Date must be between 2000-01-01 and one year from now."}, |
| 3262 | + status=status.HTTP_400_BAD_REQUEST, |
| 3263 | + ) |
| 3264 | + |
| 3265 | + # Make it timezone aware |
| 3266 | + now = timezone.make_aware(parsed_date) if timezone.is_naive(parsed_date) else parsed_date |
| 3267 | + |
| 3268 | + except ValueError: |
| 3269 | + return Response( |
| 3270 | + {"error": "Invalid date format. Use YYYY-MM-DD format."}, |
| 3271 | + status=status.HTTP_400_BAD_REQUEST, |
| 3272 | + ) |
| 3273 | + |
| 3274 | + # Get authorized product types (same as UI implementation) |
| 3275 | + product_types = get_authorized_product_types(Permissions.Product_Type_View) |
| 3276 | + |
| 3277 | + # Optional filtering by specific product type |
| 3278 | + if product_type_id: |
| 3279 | + try: |
| 3280 | + product_type_id = int(product_type_id) |
| 3281 | + product_types = product_types.filter(id=product_type_id) |
| 3282 | + if not product_types.exists(): |
| 3283 | + return Response( |
| 3284 | + {"error": "Product type not found or access denied."}, |
| 3285 | + status=status.HTTP_404_NOT_FOUND, |
| 3286 | + ) |
| 3287 | + except ValueError: |
| 3288 | + return Response( |
| 3289 | + {"error": "Invalid product_type_id format."}, |
| 3290 | + status=status.HTTP_400_BAD_REQUEST, |
| 3291 | + ) |
| 3292 | + |
| 3293 | + # Build base filter conditions (same logic as UI) |
| 3294 | + base_filter = Q( |
| 3295 | + false_p=False, |
| 3296 | + duplicate=False, |
| 3297 | + out_of_scope=False, |
| 3298 | + date__month=now.month, |
| 3299 | + date__year=now.year, |
| 3300 | + ) |
| 3301 | + |
| 3302 | + # Apply verification status filtering if enabled (same as UI) |
| 3303 | + if (get_system_setting("enforce_verified_status", True) or |
| 3304 | + get_system_setting("enforce_verified_status_metrics", True)): |
| 3305 | + base_filter &= Q(verified=True) |
| 3306 | + |
| 3307 | + # Optimize with single aggregated query per product type |
| 3308 | + metrics_data = [] |
| 3309 | + |
| 3310 | + for pt in product_types: |
| 3311 | + # Single aggregated query replacing the Python loop |
| 3312 | + metrics = Finding.objects.filter( |
| 3313 | + test__engagement__product__prod_type=pt, |
| 3314 | + ).filter(base_filter).aggregate( |
| 3315 | + # Total count |
| 3316 | + total=Count("id"), |
| 3317 | + |
| 3318 | + # Count by severity using conditional aggregation |
| 3319 | + critical=Count("id", filter=Q(severity="Critical")), |
| 3320 | + high=Count("id", filter=Q(severity="High")), |
| 3321 | + medium=Count("id", filter=Q(severity="Medium")), |
| 3322 | + low=Count("id", filter=Q(severity="Low")), |
| 3323 | + info=Count("id", filter=~Q(severity__in=["Critical", "High", "Medium", "Low"])), |
| 3324 | + |
| 3325 | + # Count opened in current month |
| 3326 | + opened=Count("id", filter=Q(date__year=now.year, date__month=now.month)), |
| 3327 | + |
| 3328 | + # Count closed in current month |
| 3329 | + closed=Count("id", filter=Q( |
| 3330 | + mitigated__isnull=False, |
| 3331 | + mitigated__year=now.year, |
| 3332 | + mitigated__month=now.month, |
| 3333 | + )), |
| 3334 | + ) |
| 3335 | + |
| 3336 | + # Build the findings summary (same structure as UI) |
| 3337 | + findings_broken_out = { |
| 3338 | + "product_type_id": pt.id, |
| 3339 | + "product_type_name": pt.name, # Always show real name like UI |
| 3340 | + "Total": metrics["total"] or 0, |
| 3341 | + "S0": metrics["critical"] or 0, # Critical |
| 3342 | + "S1": metrics["high"] or 0, # High |
| 3343 | + "S2": metrics["medium"] or 0, # Medium |
| 3344 | + "S3": metrics["low"] or 0, # Low |
| 3345 | + "S4": metrics["info"] or 0, # Info |
| 3346 | + "Opened": metrics["opened"] or 0, |
| 3347 | + "Closed": metrics["closed"] or 0, |
| 3348 | + } |
| 3349 | + |
| 3350 | + metrics_data.append(findings_broken_out) |
| 3351 | + |
| 3352 | + serializer = self.serializer_class(metrics_data, many=True) |
| 3353 | + return Response(serializer.data) |
| 3354 | + |
| 3355 | + |
3183 | 3356 | # Authorization: configuration
|
3184 | 3357 | class AnnouncementViewSet(
|
3185 | 3358 | DojoModelViewSet,
|
|
0 commit comments