diff --git a/changelog.d/20250604_143935_alina.tuholukova_add_source_id_parameter_for_multiscan.md b/changelog.d/20250604_143935_alina.tuholukova_add_source_id_parameter_for_multiscan.md new file mode 100644 index 00000000..e51e9080 --- /dev/null +++ b/changelog.d/20250604_143935_alina.tuholukova_add_source_id_parameter_for_multiscan.md @@ -0,0 +1,3 @@ +### Added + +- New `GGClient.scan_and_create_incidents()` function that scans content for secrets and automatically creates incidents for any findings. diff --git a/pygitguardian/client.py b/pygitguardian/client.py index 7ccdabde..8a323e09 100644 --- a/pygitguardian/client.py +++ b/pygitguardian/client.py @@ -8,6 +8,7 @@ from io import BytesIO from pathlib import Path from typing import Any, Dict, List, Optional, Union, cast +from uuid import UUID import requests from requests import Response, Session, codes @@ -510,6 +511,7 @@ def multi_content_scan( else: raise TypeError("each document must be a dict") + # Validate documents using DocumentSchema for document in request_obj: DocumentSchema.validate_size( document, self.secret_scan_preferences.maximum_document_size @@ -538,6 +540,72 @@ def multi_content_scan( return obj + def scan_and_create_incidents( + self, + documents: List[Dict[str, str]], + source_uuid: UUID, + *, + extra_headers: Optional[Dict[str, str]] = None, + ) -> Union[Detail, MultiScanResult]: + """ + scan_and_create_incidents handles the /scan/create-incidents endpoint of the API. + + If documents contain `0` bytes, they will be replaced with the ASCII substitute + character. + + :param documents: List of dictionaries containing the keys document + and, optionally, filename. + example: [{"document":"example content","filename":"intro.py"}] + :param source_uuid: the source UUID that will be used to identify the custom source, for which + incidents will be created + :param extra_headers: additional headers to add to the request + :return: Detail or ScanResult response and status code + """ + max_documents = self.secret_scan_preferences.maximum_documents_per_scan + if len(documents) > max_documents: + raise ValueError( + f"too many documents submitted for scan (max={max_documents})" + ) + + if all(isinstance(doc, dict) for doc in documents): + request_obj = cast( + List[Dict[str, Any]], Document.SCHEMA.load(documents, many=True) + ) + else: + raise TypeError("each document must be a dict") + + # Validate documents using DocumentSchema + for document in request_obj: + DocumentSchema.validate_size( + document, self.secret_scan_preferences.maximum_document_size + ) + + payload = { + "source_uuid": source_uuid, + "documents": [ + { + "document_identifier": document["filename"], + "document": document["document"], + } + for document in request_obj + ], + } + resp = self.post( + endpoint="scan/create-incidents", + data=payload, + extra_headers=extra_headers, + ) + + obj: Union[Detail, MultiScanResult] + if is_ok(resp): + obj = MultiScanResult.from_dict({"scan_results": resp.json()}) + else: + obj = load_detail(resp) + + obj.status_code = resp.status_code + + return obj + def retrieve_secret_incident( self, incident_id: int, with_occurrences: int = 20 ) -> Union[Detail, SecretIncident]: diff --git a/pygitguardian/models.py b/pygitguardian/models.py index 0babee2c..ba0d90c4 100644 --- a/pygitguardian/models.py +++ b/pygitguardian/models.py @@ -757,6 +757,7 @@ class TokenScope(str, Enum): CUSTOM_TAGS_READ = "custom_tags:read" CUSTOM_TAGS_WRITE = "custom_tags:write" SECRET_READ = "secrets:read" + SCAN_CREATE_INCIDENTS = "scan:create-incidents" class APITokensResponseSchema(BaseSchema): diff --git a/tests/test_client.py b/tests/test_client.py index ca006a81..971ddad5 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -606,6 +606,90 @@ def test_multiscan_parameters(client: GGClient, ignore_known_secrets, all_secret assert mock_response.call_count == 1 +@responses.activate +def test_scan_and_create_incidents_parameters(client: GGClient): + """ + GIVEN a ggclient + WHEN calling scan_and_create_incidents with parameters + THEN the parameters are passed in the request + """ + + to_match = {} + + mock_response = responses.post( + url=client._url_from_endpoint("scan/create-incidents", "v1"), + status=200, + match=[matchers.query_param_matcher(to_match)], + json=[ + { + "policy_break_count": 1, + "policies": ["pol"], + "policy_breaks": [ + { + "type": "break", + "detector_name": "break", + "detector_group_name": "break", + "documentation_url": None, + "policy": "mypol", + "matches": [ + { + "match": "hello", + "type": "hello", + } + ], + } + ], + } + ], + ) + + client.scan_and_create_incidents( + [{"filename": FILENAME, "document": DOCUMENT}], + source_uuid="123e4567-e89b-12d3-a456-426614174000", + ) + + assert mock_response.call_count == 1 + + +@responses.activate +def test_scan_and_create_incidents_payload_structure(client: GGClient): + """ + GIVEN a ggclient + WHEN calling scan_and_create_incidents + THEN the payload is structured correctly with documents and source_uuid + """ + + documents = [{"filename": FILENAME, "document": DOCUMENT}] + source_uuid = "123e4567-e89b-12d3-a456-426614174000" + + expected_payload = { + "documents": [ + { + "document": DOCUMENT, + "document_identifier": FILENAME, + } + ], + "source_uuid": source_uuid, + } + + mock_response = responses.post( + url=client._url_from_endpoint("scan/create-incidents", "v1"), + status=200, + match=[matchers.json_params_matcher(expected_payload)], + json=[ + { + "policy_break_count": 0, + "policies": ["pol"], + "policy_breaks": [], + } + ], + ) + + client.scan_and_create_incidents(documents, source_uuid) + + assert mock_response.call_count == 1 + + @responses.activate def test_retrieve_secret_incident(client: GGClient): """