Skip to content

Commit df00158

Browse files
✨ add support for HMAC signature usage for localresponses
1 parent ce2c775 commit df00158

File tree

5 files changed

+129
-22
lines changed

5 files changed

+129
-22
lines changed

mindee/client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,9 @@ def load_prediction(
190190
:return:
191191
"""
192192
try:
193-
if local_response.json.get("job"):
194-
return AsyncPredictResponse(product_class, local_response.json)
195-
return PredictResponse(product_class, local_response.json)
193+
if local_response.as_dict.get("job"):
194+
return AsyncPredictResponse(product_class, local_response.as_dict)
195+
return PredictResponse(product_class, local_response.as_dict)
196196
except KeyError as exc:
197197
raise MindeeError("No prediction found in local response.") from exc
198198

mindee/input/local_response.py

Lines changed: 76 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,90 @@
1+
import hashlib
2+
import hmac
3+
import io
14
import json
25
from pathlib import Path
3-
from typing import Union, BinaryIO, Dict, Any
4-
import io
6+
from typing import Any, BinaryIO, Dict, Union
57

68
from mindee.error import MindeeError
79

810

911
class LocalResponse:
10-
json: Dict[str, Any]
12+
"""Local response loaded from a file."""
13+
14+
_file: BinaryIO
15+
"""File object of the local response."""
1116

1217
def __init__(self, input_file: Union[BinaryIO, str, Path, bytes]):
13-
input_binary: BinaryIO
1418
if isinstance(input_file, BinaryIO):
15-
input_binary = input_file
16-
input_binary.seek(0)
17-
elif isinstance(input_file, str) or isinstance(input_file, Path):
18-
with open(input_file, 'rb') as f:
19-
input_binary = io.BytesIO(f.read())
19+
self._file = input_file
20+
self._file.seek(0)
21+
elif isinstance(input_file, (str, Path)):
22+
with open(input_file, "r", encoding="utf-8") as file:
23+
self._file = io.BytesIO(
24+
file.read().replace("\r", "").replace("\n", "").encode()
25+
)
2026
elif isinstance(input_file, bytes):
21-
input_binary = io.BytesIO(input_file)
27+
self._file = io.BytesIO(input_file)
2228
else:
23-
raise TypeError('Incompatible type for input.')
29+
raise MindeeError("Incompatible type for input.")
30+
31+
@property
32+
def as_dict(self) -> Dict[str, Any]:
33+
"""
34+
Returns the dictionary representation of the file.
35+
36+
:return: A json-like dictionary.
37+
"""
2438
try:
25-
input_binary.seek(0)
26-
self.json = json.load(input_binary)
27-
input_binary.close()
39+
self._file.seek(0)
40+
out_json = json.loads(self._file.read())
2841
except json.decoder.JSONDecodeError as exc:
29-
raise MindeeError('File is not a valid dictionary.') from exc
42+
raise MindeeError("File is not a valid dictionary.") from exc
43+
return out_json
44+
45+
@staticmethod
46+
def _process_secret_key(
47+
secret_key: Union[str, bytes, bytearray]
48+
) -> Union[bytes, bytearray]:
49+
"""
50+
Processes the secret key as a byte array.
51+
52+
:param secret_key: Secret key, either a string or a byte/byte array.
53+
:return: a byte/byte array secret key.
54+
"""
55+
if isinstance(secret_key, (bytes, bytearray)):
56+
return secret_key
57+
return secret_key.encode("utf-8")
58+
59+
def get_hmac_signature(self, secret_key: Union[str, bytes, bytearray]):
60+
"""
61+
Returns the hmac signature of the local response, from the secret key provided.
62+
63+
:param secret_key: Secret key, either a string or a byte/byte array.
64+
:return: The hmac signature of the local response.
65+
"""
66+
algorithm = hashlib.sha256
67+
68+
try:
69+
self._file.seek(0)
70+
mac = hmac.new(
71+
LocalResponse._process_secret_key(secret_key),
72+
self._file.read(),
73+
algorithm,
74+
)
75+
except (TypeError, ValueError) as exc:
76+
raise MindeeError("Could not get HMAC signature from payload.") from exc
77+
78+
return mac.hexdigest()
79+
80+
def is_valid_hmac_signature(
81+
self, secret_key: Union[str, bytes, bytearray], signature: str
82+
):
83+
"""
84+
Checks if the hmac signature of the local response is valid.
85+
86+
:param secret_key: Secret key, given as a string.
87+
:param signature:
88+
:return: True if the HMAC signature is valid.
89+
"""
90+
return signature == self.get_hmac_signature(secret_key)

tests/Input/test_local_response.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from pathlib import Path
2+
3+
import pytest
4+
5+
from mindee.input import LocalResponse
6+
from tests.api.test_async_response import ASYNC_DIR
7+
8+
9+
@pytest.fixture
10+
def dummy_secret_key():
11+
return "ogNjY44MhvKPGTtVsI8zG82JqWQa68woYQH"
12+
13+
@pytest.fixture
14+
def signature():
15+
return "5ed1673e34421217a5dbfcad905ee62261a3dd66c442f3edd19302072bbf70d0"
16+
17+
@pytest.fixture
18+
def file_path():
19+
return Path(ASYNC_DIR / "get_completed_empty.json")
20+
21+
def test_valid_file_local_response(dummy_secret_key, signature, file_path):
22+
local_response = LocalResponse(file_path)
23+
assert local_response._file is not None
24+
assert not local_response.is_valid_hmac_signature(dummy_secret_key, "invalid signature")
25+
assert signature == local_response.get_hmac_signature(dummy_secret_key)
26+
assert local_response.is_valid_hmac_signature(dummy_secret_key, signature)
27+
28+
def test_valid_path_local_response(dummy_secret_key, signature, file_path):
29+
local_response = LocalResponse(file_path)
30+
assert local_response._file is not None
31+
assert not local_response.is_valid_hmac_signature(dummy_secret_key, "invalid signature")
32+
assert signature == local_response.get_hmac_signature(dummy_secret_key)
33+
assert local_response.is_valid_hmac_signature(dummy_secret_key, signature)
34+
35+
def test_valid_bytes_local_response(dummy_secret_key, signature, file_path):
36+
with open(file_path, 'r') as f:
37+
str_response = f.read().replace('\r', '').replace('\n', '')
38+
file_bytes = str_response.encode('utf-8')
39+
local_response = LocalResponse(file_bytes)
40+
assert local_response._file is not None
41+
assert not local_response.is_valid_hmac_signature(dummy_secret_key, "invalid signature")
42+
assert signature == local_response.get_hmac_signature(dummy_secret_key)
43+
assert local_response.is_valid_hmac_signature(dummy_secret_key, signature)
44+

tests/api/test_async_response.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
from pathlib import Path
23

34
import pytest
45
import requests
@@ -10,7 +11,7 @@
1011
from mindee.parsing.common.async_predict_response import AsyncPredictResponse
1112
from mindee.product.invoice_splitter import InvoiceSplitterV1
1213

13-
ASYNC_DIR = "./tests/data/async"
14+
ASYNC_DIR = Path("./tests/data/async")
1415

1516
FILE_PATH_POST_SUCCESS = f"{ASYNC_DIR}/post_success.json"
1617
FILE_PATH_POST_FAIL = f"{ASYNC_DIR}/post_fail_forbidden.json"

tests/test_client.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,11 @@ def test_local_response_from_async_json(dummy_client: Client):
143143

144144

145145
def test_local_response_from_invalid_file(dummy_client: Client):
146+
local_response = LocalResponse(
147+
PRODUCT_DATA_DIR / "invoices" / "response_v4" / "summary_full.rst"
148+
)
146149
with pytest.raises(MindeeError):
147-
LocalResponse(
148-
PRODUCT_DATA_DIR / "invoices" / "response_v4" / "summary_full.rst"
149-
)
150+
print(local_response.as_dict)
150151

151152

152153
def test_local_response_from_invalid_dict(dummy_client: Client):

0 commit comments

Comments
 (0)