Skip to content

Allow configurable timeouts for the scan methods that submit large byte bodies. #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 17, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 114 additions & 20 deletions arachnid_shield_sdk/api/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,22 @@
)


TIMEOUT_WRITE_PERMISSIVE = httpx.Timeout(
60, # Default timeout for all operations unless otherwise stated.
connect=3,
# Large chunks can take arbitrarily long to complete a write
# so wait arbitrarily long to finish writes.
write=None,
)

TIMEOUT_READ_PERMISSIVE = httpx.Timeout(
60, # Default timeout for all operations unless otherwise stated.
connect=3,
# Allow the server enough time to process the request and to read the response back.
read=60
)


class ArachnidShield(_ArachnidShield):
"""A client to communicate with the Arachnid Shield API
provided by the Canadian Centre for Child Protection.
Expand All @@ -30,13 +46,20 @@ def __init__(self, username: typing.Union[str, bytes], password: typing.Union[st
super().__init__(username=username, password=password)
self.__client = super()._build_sync_http_client()

def scan_media_from_bytes(self, contents: typing.Union[bytes, io.BytesIO], mime_type: str) -> ScannedMedia:
def scan_media_from_bytes(
self,
contents: typing.Union[bytes, io.BytesIO],
mime_type: str,
timeout: typing.Optional[httpx.Timeout] = TIMEOUT_WRITE_PERMISSIVE,
) -> ScannedMedia:
"""Given the contents of some media, along with a mime type,
scan the contents for matches against known child abuse media.

Args:
contents: The raw bytes that represent the media.
mime_type: The mimetype of the media.
timeout:
If provided, will set a timeout configuration for the underlying http client.

Returns:
The record of a successful media scan.
Expand All @@ -45,10 +68,13 @@ def scan_media_from_bytes(self, contents: typing.Union[bytes, io.BytesIO], mime_
`ArachnidShieldError` on a failed but complete interaction with
the Arachnid Shield API, and `httpx.HTTPError` on any other connection failures.
"""
return self.scan_media_from_bytes_with_config(ScanMediaFromBytes(contents=contents, mime_type=mime_type))
return self.scan_media_from_bytes_with_config(ScanMediaFromBytes(contents=contents, mime_type=mime_type), timeout=timeout)

def scan_media_from_file(
self, filepath: pathlib.Path, mime_type_override: typing.Optional[str] = None
self,
filepath: pathlib.Path,
mime_type_override: typing.Optional[str] = None,
timeout: typing.Optional[httpx.Timeout] = TIMEOUT_WRITE_PERMISSIVE,
) -> ScannedMedia:
"""Given path to the media file to scan, and an optional
value for mime_type that bypasses guessing it based of the filepath,
Expand All @@ -60,6 +86,8 @@ def scan_media_from_file(
mime_type_override:
If provided, will use this as the mime_type
instead of guessing it from the filepath.
timeout:
If provided, will set a timeout configuration for the underlying http client.

Returns:
The record of a successful media scan.
Expand All @@ -78,7 +106,7 @@ def scan_media_from_file(
detail=(
f"Failed to identify mime_type for {filepath}. "
f"You may specify it explicitly by providing "
f"`force_mime_type`."
f"`mime_type_override`."
)
)
)
Expand All @@ -87,14 +115,20 @@ def scan_media_from_file(
contents = f.read()

config = ScanMediaFromBytes(contents=contents, mime_type=mime_type)
return self.scan_media_from_bytes_with_config(config)
return self.scan_media_from_bytes_with_config(config, timeout=timeout)

def scan_media_from_url(self, url: str) -> ScannedMedia:
def scan_media_from_url(
self,
url: str,
timeout: typing.Optional[httpx.Timeout] = TIMEOUT_READ_PERMISSIVE,
) -> ScannedMedia:
"""Given the absolute url that hosts the media we wish to scan,
scan the contents of that url for matches against known harmful content.

Args:
url: The absolute URL to scan.
timeout:
If provided, will set a timeout configuration for the underlying http client.

Returns:
The record of a successful media scan.
Expand All @@ -103,14 +137,20 @@ def scan_media_from_url(self, url: str) -> ScannedMedia:
`ArachnidShieldError` on a failed but complete interaction with
the Arachnid Shield API, and `httpx.HTTPError` on any other connection failures.
"""
return self.scan_media_from_url_with_config(ScanMediaFromUrl(url=url))
return self.scan_media_from_url_with_config(ScanMediaFromUrl(url=url), timeout=timeout)

def scan_media_from_bytes_with_config(self, config: ScanMediaFromBytes) -> ScannedMedia:
def scan_media_from_bytes_with_config(
self,
config: ScanMediaFromBytes,
timeout: typing.Optional[httpx.Timeout] = TIMEOUT_WRITE_PERMISSIVE,
) -> ScannedMedia:
"""Given the contents of some media, along with a mime type,
scan the contents for matches against known child abuse media.

Args:
config: The context that will be used to build the request.
timeout:
If provided, will set a timeout configuration for the underlying http client.

Returns:
ScannedMedia: A record of a successful scan of the media.
Expand All @@ -125,6 +165,7 @@ def scan_media_from_bytes_with_config(self, config: ScanMediaFromBytes) -> Scann
url=url,
headers={"Content-Type": config.mime_type},
content=config.contents,
timeout=timeout,
)

if response.is_client_error or response.is_server_error:
Expand All @@ -134,12 +175,18 @@ def scan_media_from_bytes_with_config(self, config: ScanMediaFromBytes) -> Scann
response.raise_for_status()
return ScannedMedia.from_dict(response.json())

def scan_media_from_url_with_config(self, config: ScanMediaFromUrl) -> ScannedMedia:
def scan_media_from_url_with_config(
self,
config: ScanMediaFromUrl,
timeout: typing.Optional[httpx.Timeout] = TIMEOUT_READ_PERMISSIVE,
) -> ScannedMedia:
"""Given the absolute url that hosts the media we wish to scan,
scan the contents of that url for matches against known harmful content.

Args:
config: The context that will be used to build the request.
timeout:
If provided, will set a timeout configuration for the underlying http client.

Returns:
ScannedMedia: A record of a successful scan of the media.
Expand All @@ -155,6 +202,7 @@ def scan_media_from_url_with_config(self, config: ScanMediaFromUrl) -> ScannedMe
url=_url,
headers={"Content-Type": "application/json"},
json=config.to_dict(),
timeout=timeout,
)

if response.is_client_error or response.is_server_error:
Expand All @@ -164,11 +212,17 @@ def scan_media_from_url_with_config(self, config: ScanMediaFromUrl) -> ScannedMe
response.raise_for_status()
return ScannedMedia.from_dict(response.json())

def scan_pdq_hashes(self, config: ScanMediaFromPdq) -> ScannedPDQHashes:
def scan_pdq_hashes(
self,
config: ScanMediaFromPdq,
timeout: typing.Optional[httpx.Timeout] = TIMEOUT_READ_PERMISSIVE,
) -> ScannedPDQHashes:
"""
Scan medias for CSAM based on their PDQ hashes.
Args:
config: The context that will be used to build the request.
timeout:
If provided, will set a timeout configuration for the underlying http client.

Returns:
ScannedPDQHashes: A record of a batch of PDQ hashes that have been scanned by the Arachnid Shield API
Expand All @@ -183,6 +237,7 @@ def scan_pdq_hashes(self, config: ScanMediaFromPdq) -> ScannedPDQHashes:
url=_url,
headers={"Content-Type": "application/json"},
json=config.to_dict(),
timeout=timeout,
)
if response.is_client_error or response.is_server_error:
error_detail = ErrorDetail.from_dict(response.json())
Expand All @@ -203,13 +258,20 @@ def __init__(self, username: typing.Union[str, bytes], password: typing.Union[st
super().__init__(username=username, password=password)
self.__client = super()._build_async_http_client()

async def scan_media_from_bytes(self, contents: typing.Union[bytes, io.BytesIO], mime_type: str) -> ScannedMedia:
async def scan_media_from_bytes(
self,
contents: typing.Union[bytes, io.BytesIO],
mime_type: str,
timeout: typing.Optional[httpx.Timeout] = TIMEOUT_WRITE_PERMISSIVE,
) -> ScannedMedia:
"""Given the contents of some media, along with a mime type,
scan the contents for matches against known child abuse media.

Args:
contents: The raw bytes that represent the media.
mime_type: The mimetype of the media.
timeout:
If provided, will set a timeout configuration for the underlying http client.

Returns:
The record of a successful media scan.
Expand All @@ -219,14 +281,20 @@ async def scan_media_from_bytes(self, contents: typing.Union[bytes, io.BytesIO],
the Arachnid Shield API, and `httpx.HTTPError` on any other connection failures.
"""

return await self.scan_media_from_bytes_with_config(ScanMediaFromBytes(contents=contents, mime_type=mime_type))
return await self.scan_media_from_bytes_with_config(ScanMediaFromBytes(contents=contents, mime_type=mime_type), timeout=timeout)

async def scan_media_from_url(self, url: str) -> ScannedMedia:
async def scan_media_from_url(
self,
url: str,
timeout: typing.Optional[httpx.Timeout] = TIMEOUT_READ_PERMISSIVE,
) -> ScannedMedia:
"""Given the absolute url that hosts the media we wish to scan,
scan the contents of that url for matches against known harmful content.

Args:
url: The absolute URL to scan.
timeout:
If provided, will set a timeout configuration for the underlying http client.

Returns:
The record of a successful media scan.
Expand All @@ -235,10 +303,13 @@ async def scan_media_from_url(self, url: str) -> ScannedMedia:
`ArachnidShieldError` on a failed but complete interaction with
the Arachnid Shield API, and `httpx.HTTPError` on any other connection failures.
"""
return await self.scan_media_from_url_with_config(ScanMediaFromUrl(url=url))
return await self.scan_media_from_url_with_config(ScanMediaFromUrl(url=url), timeout=timeout)

async def scan_media_from_file(
self, filepath: pathlib.Path, mime_type_override: typing.Optional[str] = None
self,
filepath: pathlib.Path,
mime_type_override: typing.Optional[str] = None,
timeout: typing.Optional[httpx.Timeout] = TIMEOUT_WRITE_PERMISSIVE,
) -> ScannedMedia:
"""Given path to the media file to scan, and an optional
value for mime_type that bypasses guessing it based of the filepath,
Expand All @@ -250,6 +321,8 @@ async def scan_media_from_file(
mime_type_override:
If provided, will use this as the mime_type
instead of guessing it from the filepath.
timeout:
If provided, will set a timeout configuration for the underlying http client.

Returns:
The record of a successful media scan.
Expand All @@ -268,7 +341,7 @@ async def scan_media_from_file(
detail=(
f"Failed to identify mime_type for {filepath}. "
f"You may specify it explicitly by providing "
f"`force_mime_type`."
f"`mime_type_override`."
)
)
)
Expand All @@ -277,14 +350,20 @@ async def scan_media_from_file(
contents = f.read()

config = ScanMediaFromBytes(contents=contents, mime_type=mime_type)
return await self.scan_media_from_bytes_with_config(config)
return await self.scan_media_from_bytes_with_config(config, timeout=timeout)

async def scan_media_from_bytes_with_config(self, config: ScanMediaFromBytes) -> ScannedMedia:
async def scan_media_from_bytes_with_config(
self,
config: ScanMediaFromBytes,
timeout: typing.Optional[httpx.Timeout] = TIMEOUT_WRITE_PERMISSIVE,
) -> ScannedMedia:
"""Given the contents of some media, along with a mime type,
scan the contents for matches against known child abuse media.

Args:
config: The context that will be used to build the request.
timeout:
If provided, will set a timeout configuration for the underlying http client.

Returns:
ScannedMedia: A record of a successful scan of the media.
Expand All @@ -300,6 +379,7 @@ async def scan_media_from_bytes_with_config(self, config: ScanMediaFromBytes) ->
url=url,
headers={"Content-Type": config.mime_type},
content=config.contents,
timeout=timeout,
)

if response.is_client_error or response.is_server_error:
Expand All @@ -309,12 +389,18 @@ async def scan_media_from_bytes_with_config(self, config: ScanMediaFromBytes) ->
response.raise_for_status()
return ScannedMedia.from_dict(response.json())

async def scan_media_from_url_with_config(self, config: ScanMediaFromUrl) -> ScannedMedia:
async def scan_media_from_url_with_config(
self,
config: ScanMediaFromUrl,
timeout: typing.Optional[httpx.Timeout] = TIMEOUT_READ_PERMISSIVE,
) -> ScannedMedia:
"""Given the absolute url that hosts the media we wish to scan,
scan the contents of that url for matches against known harmful content.

Args:
config: The context that will be used to build the request.
timeout:
If provided, will set a timeout configuration for the underlying http client.

Returns:
ScannedMedia: A record of a successful scan of the media.
Expand All @@ -330,6 +416,7 @@ async def scan_media_from_url_with_config(self, config: ScanMediaFromUrl) -> Sca
url=_url,
headers={"Content-Type": "application/json"},
json=config.to_dict(),
timeout=timeout,
)

if response.is_client_error or response.is_server_error:
Expand All @@ -339,11 +426,17 @@ async def scan_media_from_url_with_config(self, config: ScanMediaFromUrl) -> Sca
response.raise_for_status()
return ScannedMedia.from_dict(response.json())

async def scan_pdq_hashes(self, config: ScanMediaFromPdq) -> ScannedPDQHashes:
async def scan_pdq_hashes(
self,
config: ScanMediaFromPdq,
timeout: typing.Optional[httpx.Timeout] = TIMEOUT_READ_PERMISSIVE,
) -> ScannedPDQHashes:
"""
Scan medias for CSAM based on their PDQ hashes.
Args:
config: The context that will be used to build the request.
timeout:
If provided, will set a timeout configuration for the underlying http client.

Returns:
ScannedPDQHashes: A record of a batch of PDQ hashes that have been scanned by the Arachnid Shield API
Expand All @@ -358,6 +451,7 @@ async def scan_pdq_hashes(self, config: ScanMediaFromPdq) -> ScannedPDQHashes:
url=_url,
headers={"Content-Type": "application/json"},
json=config.to_dict(),
timeout=timeout,
)
if response.is_client_error or response.is_server_error:
error_detail = ErrorDetail.from_dict(response.json())
Expand Down