Skip to content

Commit 635df02

Browse files
committed
[FIX] l10n_it_edi: remove_signature dependencies fix
There's an AttributeError issue with cryptography==42.0.8 and pyopenssl==24.1.0, where PKCS7_NOVERIFY flag no longer exists in the cryptography module. This PR backports and optimizes (2x) some homemade code introduced in saas-17.3 as a fallback for PyOpenSSL. See: PR odoo#137572 We can investigate fixing the calls to cryptography.hazmat.bindings._rust.test_support.pkcs7_verify but it currently doesn't support the PKCS7_NO_VERIFY flag. The pyca team has a PR to re-introduce it in Rust, but at the moment it is not available. See: pyca/cryptography#12116 NO_VERIFY is useful because sometimes certificates are not valid, and yet we still have to read the invoice which is badly signed. We cannot take for granted that the Tax Agency checks valid certificates, since it doesn't even properly check the ASN1 structure. References: - PyOpenSSL doesn't support load_pkcs7_data anymore. pyca/pyopenssl@0fe822d - Cryptography has removed PKCS7_NOVERIFY: pyca/cryptography@615967b and is migrating PKCS7_verify to Rust: https://github.com/pyca/cryptography/blob/43.0.x/src/rust/src/types.rs#L333 - `asn1` library is pure Python and MIT licensed, but is slower than our homemade solution https://github.com/andrivet/python-asn1/blob/master/src/asn1.py
1 parent 4133952 commit 635df02

File tree

1 file changed

+242
-39
lines changed

1 file changed

+242
-39
lines changed
Lines changed: 242 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,254 @@
1-
# -*- coding:utf-8 -*-
2-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
1+
"""
2+
Italian E-invoice signed files content extraction.
33
4-
import logging
5-
import warnings
4+
- PyOpenSSL doesn't support ``load_pkcs7_data`` anymore.
5+
https://github.com/pyca/pyopenssl/commit/0fe822dc8d6610b8ec9ebaff626d6bf23e0a7ad3
6+
- Cryptography is migrating PKCS7_verify to Rust, and has removed PKCS7_NOVERIFY
7+
https://github.com/pyca/cryptography/commit/615967bfab5b49e470fe7d0df44649c69fb9a847
8+
https://github.com/pyca/cryptography/pull/8332
9+
- ``asn1`` library is pure Python and MIT licensed, but is slower than our homemade solution
10+
https://github.com/andrivet/python-asn1/blob/master/src/asn1.py
611
7-
_logger = logging.getLogger(__name__)
12+
This version is more optimized than what we had as a fallback.
13+
"""
814

9-
try:
10-
from OpenSSL import crypto as ssl_crypto
11-
import OpenSSL._util as ssl_util
12-
except ImportError:
13-
ssl_crypto = None
14-
_logger.warning("Cannot import library 'OpenSSL' for PKCS#7 envelope extraction.")
1515

16+
PKCS7_DATA_OID = '1.2.840.113549.1.7.1'
17+
universal_tags = [
18+
'Zero', 'Boolean', 'Integer', 'BitString', 'OctetString',
19+
'Null', 'ObjectIdentifier', 'ObjectDescriptor', 'External', 'Real',
20+
'Enumerated', 'EmbeddedPDV', 'UTF8String', 'RelativeOid', None,
21+
None, 'Sequence', 'Set', 'NumericString', 'PrintableString',
22+
'TeletexString', 'VideotexString', 'IA5String', 'UTCTime', 'GeneralizedTime',
23+
'GraphicString', 'VisibleString', 'GeneralString', 'UniversalString', 'CharacterString',
24+
'BMPString',
25+
]
1626

17-
def remove_signature(content):
18-
""" Remove the PKCS#7 envelope from given content, making a '.xml.p7m' file content readable as it was '.xml'.
19-
As OpenSSL may not be installed, in that case a warning is issued and None is returned. """
2027

21-
# Prevent using the library if it had import errors
22-
if not ssl_crypto:
23-
_logger.warning("Error reading the content, check if the OpenSSL library is installed for for PKCS#7 envelope extraction.")
24-
return None
28+
def remove_signature(content, target=None):
29+
""" Takes a bytestring supposedly PKCS7 signed and returns its PKCS7-data only """
30+
if target:
31+
target.remove_signature_method = '_remove_signature'
32+
try:
33+
return _remove_signature(content)
34+
except Exception:
35+
return content
2536

26-
# Load some tools from the library
27-
null = ssl_util.ffi.NULL
28-
verify = ssl_util.lib.PKCS7_verify
37+
def _remove_signature(content):
38+
""" The invoice content is inside an ASN1 node identified by PKCS7_DATA_OID (pkcs7-data).
39+
The node is defined as an OctectString, which can be composed of an arbitrary
40+
sequence of octects of string data.
41+
We visit in-order the ASN1 tree nodes until we find the pkcs7-data, then we look for content.
42+
Once we found it, we read all OctectString that get yielded by the in-order visit..
43+
When there are no more OctectStrings, then another object will follow
44+
with its header and identifier, so we stop exploring and just return the content.
2945
30-
# By default ignore the validity of the certificates, just validate the structure
31-
flags = ssl_util.lib.PKCS7_NOVERIFY | ssl_util.lib.PKCS7_NOSIGS
46+
See also:
47+
https://datatracker.ietf.org/doc/html/rfc2315
48+
https://www.oss.com/asn1/resources/asn1-made-simple/asn1-quick-reference/octetstring.html
49+
"""
50+
result, header_found, data_found = b'', False, False
51+
reader = Reader()
52+
for node in reader.build_from_stream(content):
53+
if node.kind == 'ObjectIdentifier' and node.content == PKCS7_DATA_OID:
54+
header_found = True
55+
if header_found and node.kind == 'OctetString':
56+
data_found = True
57+
result += node.content
58+
elif data_found:
59+
break
60+
if not header_found:
61+
raise Exception("ASN1 Header not found")
62+
if not data_found:
63+
raise Exception("ASN1 Content not found")
64+
return result
3265

33-
# Read the signed data fron the content
34-
out_buffer = ssl_crypto._new_mem_buf()
3566

36-
# This method is deprecated, but there are actually no alternatives
37-
with warnings.catch_warnings():
38-
warnings.filterwarnings("ignore", category=DeprecationWarning)
39-
try:
40-
loaded_data = ssl_crypto.load_pkcs7_data(ssl_crypto.FILETYPE_ASN1, content)
41-
except ssl_crypto.Error:
42-
_logger.warning("Error reading the content, PKCS#7 signature missing or invalid. Content will be tentatively used as it is.")
43-
return content
67+
class Asn1Node:
68+
""" Base class for Asn1 nodes """
69+
_content = None
70+
is_primitive = False
71+
finalized = False
4472

45-
# Verify the signature
46-
if verify(loaded_data._pkcs7, null, null, null, out_buffer, flags) != 1:
47-
ssl_crypto._raise_current_error()
73+
def __init__(self, kind, start_offset, node_len):
74+
""" Initialization of the Asn1 node """
75+
self.kind = kind
76+
self.start_offset = start_offset
77+
self.length = node_len
4878

49-
# Get the content as a byte-string
50-
decoded_content = ssl_crypto._bio_to_string(out_buffer)
51-
return decoded_content
79+
def total_length(self):
80+
""" Get the total length of the node if defined. The definition and length bytes must be considered. """
81+
return self.length + 2 if self.length != "?" else "?"
82+
83+
@property
84+
def content(self):
85+
return self._content
86+
87+
@content.setter
88+
def content(self, content):
89+
self._content = content
90+
91+
92+
class PrimitiveNode(Asn1Node):
93+
""" Primitive Asn1 nodes contain pure data """
94+
is_primitive = True
95+
name = "Primitive"
96+
97+
98+
class ObjectIdentifierNode(PrimitiveNode):
99+
""" Asn1 Object Identifier, i.e. 1.3.6.1.5.5.7.48.1 """
100+
@Asn1Node.content.setter
101+
def content(self, content):
102+
# Run through the content's bytes
103+
calc = 0
104+
result = f"{content[0] // 40}.{content[0] % 40}"
105+
for octet in content[1:]:
106+
# Other positions value the less significant 7 bits,
107+
# but the most significant bit is only negative when there's a break
108+
calc = (calc << 7) + (octet % (1 << 7))
109+
if not (octet & 0x80):
110+
result = f"{result}.{calc}"
111+
calc = 0
112+
self._content = result
113+
114+
115+
class Reader:
116+
offset = 0
117+
root = None
118+
current_node = None
119+
last_open_node = None
120+
121+
def __init__(self, *args, **kwargs):
122+
self.open_nodes_stack = []
123+
124+
def finalize_last_open_node(self):
125+
""" Whenever a node is complete, it is finalized, and the references are updated """
126+
node = self.open_nodes_stack.pop()
127+
node.content = None
128+
self.current_node = None
129+
node.end_offset = self.offset
130+
node.finalized = True
131+
self.last_open_node = self.open_nodes_stack[-1] if self.open_nodes_stack else None
132+
return node
133+
134+
def build_from_stream(self, stream):
135+
""" Build an Asn1 tree starting from a byte string from a p7m file """
136+
137+
len_stream = len(stream)
138+
while self.offset < len_stream:
139+
140+
start_offset = self.offset
141+
self.last_open_node = self.open_nodes_stack[-1] if self.open_nodes_stack else None
142+
143+
# Read the definition and length bytes
144+
definition_byte = ord(stream[self.offset:self.offset + 1])
145+
self.offset += 1
146+
node_len, self.offset = self.read_length(stream, self.offset)
147+
148+
if definition_byte == 0 and node_len == 0 and self.open_nodes_stack:
149+
yield self.finalize_last_open_node()
150+
continue
151+
152+
# Create the current Node
153+
self.current_node = self.create_node(definition_byte, node_len, start_offset)
154+
if not self.root:
155+
self.root = self.current_node
156+
157+
# If not primitive, add to the stack
158+
if not self.current_node.is_primitive:
159+
self.open_nodes_stack.append(self.current_node)
160+
self.last_open_node = self.current_node
161+
else:
162+
node = self.current_node
163+
new_offset = self.offset + node_len
164+
node.content = stream[self.offset:new_offset]
165+
self.offset = new_offset
166+
node.end_offset = new_offset
167+
node.finalized = True
168+
yield node
169+
170+
# Clear the stack of all finished nodes
171+
while (
172+
self.last_open_node
173+
and not self.last_open_node.finalized
174+
and self.last_open_node.length != '?'
175+
and self.last_open_node.start_offset + self.last_open_node.total_length() <= self.offset
176+
):
177+
yield self.finalize_last_open_node()
178+
179+
return self.root
180+
181+
def read_length(self, stream, offset):
182+
""" Returns: (length of the node, bytes read, updated offset) """
183+
184+
# Read the first byte: if it is zero, it's a special entry.
185+
# Probably it's the second byte of a closing tag of a node (\x00 \x00 <--)
186+
187+
first_byte = stream[offset:offset + 1]
188+
if first_byte == b'\x00':
189+
return 0, offset + 1
190+
elif first_byte == b'\x80':
191+
# If it's the only bit being set, the length is indefinite,
192+
# and the node will terminate with a double \x00
193+
return '?', offset + 1
194+
first_byte_val = ord(first_byte)
195+
if first_byte < b'\x80':
196+
# If the first bit of the first length byte is on
197+
return first_byte_val, offset + 1
198+
else:
199+
# Each byte we read is less significant, so we increase the significance of the
200+
# value we already read and increment by the current byte
201+
offset += 1
202+
node_len = 0
203+
length_bytes_no = first_byte_val % (1 << 7)
204+
for length_byte in stream[offset:offset + length_bytes_no]:
205+
node_len = (node_len << 8) + length_byte
206+
return node_len, offset + length_bytes_no
207+
208+
def create_node(self, definition_byte, node_len, start_offset):
209+
""" Method to create new Asn1 nodes, given the definition bytes and the offset """
210+
target_class = Asn1Node
211+
kind = "Indefinite" if node_len == "?" else "Container"
212+
cls = {
213+
(0, 0): 'Universal',
214+
(0, 1): 'Application',
215+
(1, 0): 'Context-specific',
216+
(1, 1): 'Private'
217+
}[(
218+
definition_byte & (1 << 7) and 1,
219+
definition_byte & (1 << 6) and 1
220+
)]
221+
if cls == 'Universal' and not definition_byte & (1 << 5) and 1:
222+
tag = definition_byte % (1 << 5)
223+
kind = universal_tags[tag]
224+
if kind == 'ObjectIdentifier':
225+
target_class = ObjectIdentifierNode
226+
else:
227+
target_class = PrimitiveNode
228+
return target_class(kind, start_offset, node_len)
229+
230+
231+
if __name__ == '__main__':
232+
"""
233+
python remove_signature.py /path/to/einvoice.xml.p7m [times]
234+
"""
235+
import sys
236+
from lxml import etree
237+
from cProfile import Profile
238+
from pstats import SortKey, Stats
239+
240+
filename = sys.argv[1]
241+
times = len(sys.argv) > 2 and sys.argv[2]
242+
243+
with open(filename, 'rb') as f:
244+
content = f.read().rstrip()
245+
246+
if times:
247+
with Profile() as profile:
248+
for i in range(1, int(times) + 1):
249+
result = remove_signature(content)
250+
Stats(profile).strip_dirs().sort_stats(SortKey.CALLS).print_stats()
251+
else:
252+
result = remove_signature(content)
253+
parser = etree.XMLParser(recover=True, resolve_entities=False)
254+
print(etree.tostring(etree.fromstring(result, parser)).decode())

0 commit comments

Comments
 (0)