Skip to content

feat: handle new endpoint to scan documents and create incidents for the found secrets #147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Added

- New `GGClient.scan_and_create_incidents()` function that scans content for secrets and automatically creates incidents for any findings.
68 changes: 68 additions & 0 deletions pygitguardian/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from io import BytesIO
from pathlib import Path
from typing import Any, Dict, List, Optional, Union, cast
from uuid import UUID

import requests
from requests import Response, Session, codes
Expand Down Expand Up @@ -510,6 +511,7 @@ def multi_content_scan(
else:
raise TypeError("each document must be a dict")

# Validate documents using DocumentSchema
for document in request_obj:
DocumentSchema.validate_size(
document, self.secret_scan_preferences.maximum_document_size
Expand Down Expand Up @@ -538,6 +540,72 @@ def multi_content_scan(

return obj

def scan_and_create_incidents(
self,
documents: List[Dict[str, str]],
source_uuid: UUID,
*,
extra_headers: Optional[Dict[str, str]] = None,
) -> Union[Detail, MultiScanResult]:
"""
scan_and_create_incidents handles the /scan/create-incidents endpoint of the API.

If documents contain `0` bytes, they will be replaced with the ASCII substitute
character.

:param documents: List of dictionaries containing the keys document
and, optionally, filename.
example: [{"document":"example content","filename":"intro.py"}]
:param source_uuid: the source UUID that will be used to identify the custom source, for which
incidents will be created
:param extra_headers: additional headers to add to the request
:return: Detail or ScanResult response and status code
"""
max_documents = self.secret_scan_preferences.maximum_documents_per_scan
if len(documents) > max_documents:
raise ValueError(
f"too many documents submitted for scan (max={max_documents})"
)

if all(isinstance(doc, dict) for doc in documents):
request_obj = cast(
List[Dict[str, Any]], Document.SCHEMA.load(documents, many=True)
)
else:
raise TypeError("each document must be a dict")

# Validate documents using DocumentSchema
for document in request_obj:
DocumentSchema.validate_size(
document, self.secret_scan_preferences.maximum_document_size
)

payload = {
"source_uuid": source_uuid,
"documents": [
{
"document_identifier": document["filename"],
"document": document["document"],
}
for document in request_obj
],
}
resp = self.post(
endpoint="scan/create-incidents",
data=payload,
extra_headers=extra_headers,
)

obj: Union[Detail, MultiScanResult]
if is_ok(resp):
obj = MultiScanResult.from_dict({"scan_results": resp.json()})
else:
obj = load_detail(resp)

obj.status_code = resp.status_code

return obj

def retrieve_secret_incident(
self, incident_id: int, with_occurrences: int = 20
) -> Union[Detail, SecretIncident]:
Expand Down
1 change: 1 addition & 0 deletions pygitguardian/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,7 @@ class TokenScope(str, Enum):
CUSTOM_TAGS_READ = "custom_tags:read"
CUSTOM_TAGS_WRITE = "custom_tags:write"
SECRET_READ = "secrets:read"
SCAN_CREATE_INCIDENTS = "scan:create-incidents"


class APITokensResponseSchema(BaseSchema):
Expand Down
84 changes: 84 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,90 @@ def test_multiscan_parameters(client: GGClient, ignore_known_secrets, all_secret
assert mock_response.call_count == 1


@responses.activate
def test_scan_and_create_incidents_parameters(client: GGClient):
"""
GIVEN a ggclient
WHEN calling scan_and_create_incidents with parameters
THEN the parameters are passed in the request
"""

to_match = {}

mock_response = responses.post(
url=client._url_from_endpoint("scan/create-incidents", "v1"),
status=200,
match=[matchers.query_param_matcher(to_match)],
json=[
{
"policy_break_count": 1,
"policies": ["pol"],
"policy_breaks": [
{
"type": "break",
"detector_name": "break",
"detector_group_name": "break",
"documentation_url": None,
"policy": "mypol",
"matches": [
{
"match": "hello",
"type": "hello",
}
],
}
],
}
],
)

client.scan_and_create_incidents(
[{"filename": FILENAME, "document": DOCUMENT}],
source_uuid="123e4567-e89b-12d3-a456-426614174000",
)

assert mock_response.call_count == 1


@responses.activate
def test_scan_and_create_incidents_payload_structure(client: GGClient):
"""
GIVEN a ggclient
WHEN calling scan_and_create_incidents
THEN the payload is structured correctly with documents and source_uuid
"""

documents = [{"filename": FILENAME, "document": DOCUMENT}]
source_uuid = "123e4567-e89b-12d3-a456-426614174000"

expected_payload = {
"documents": [
{
"document": DOCUMENT,
"document_identifier": FILENAME,
}
],
"source_uuid": source_uuid,
}

mock_response = responses.post(
url=client._url_from_endpoint("scan/create-incidents", "v1"),
status=200,
match=[matchers.json_params_matcher(expected_payload)],
json=[
{
"policy_break_count": 0,
"policies": ["pol"],
"policy_breaks": [],
}
],
)

client.scan_and_create_incidents(documents, source_uuid)

assert mock_response.call_count == 1


@responses.activate
def test_retrieve_secret_incident(client: GGClient):
"""
Expand Down
Loading