Skip to content

Commit e61f84b

Browse files
authored
Merge pull request #586 from Nikokrock/add_support_for_dsse
Add support for DSSE (Dead Simple Signing Envelope)
2 parents fffcf74 + dc677ab commit e61f84b

File tree

3 files changed

+237
-4
lines changed

3 files changed

+237
-4
lines changed

src/e3/dsse.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
from __future__ import annotations
2+
from e3.os.process import Run
3+
import base64
4+
import json
5+
import tempfile
6+
import os
7+
8+
9+
class DSSEError(Exception):
10+
pass
11+
12+
13+
class DSSE:
14+
"""DSSE: Dead Simple Signing Envelope.
15+
16+
The current implementation relies on openssl tool.
17+
"""
18+
19+
def __init__(self, body: str | bytes, payload_type: str) -> None:
20+
"""Initiazse a DSSE envelope.
21+
22+
:param body: the content to sign
23+
:param payload_type: the type of the payload
24+
"""
25+
if isinstance(body, str):
26+
self.body = body.encode("utf-8")
27+
else:
28+
self.body = body
29+
self.payload_type = payload_type
30+
self.signatures: list[dict[str, str]] = []
31+
32+
def sign(self, key_id: str, private_key: str) -> str:
33+
"""Sign the payload using openssl X509 certificate.
34+
35+
:param key_id: the key id (used by end-user to identify which key to use
36+
for verification).
37+
:param private_key: path to file containing the private key
38+
:return: return the signature as base64 string
39+
"""
40+
p = Run(
41+
["openssl", "dgst", "-sha256", "-sign", private_key, "-out", "-", "-"],
42+
input=b"|" + self.pae,
43+
)
44+
if p.status == 0 and p.raw_out is not None:
45+
base64_signature = base64.b64encode(p.raw_out).decode("utf-8")
46+
self.signatures.append({"keyid": key_id, "sig": base64_signature})
47+
return base64_signature
48+
else:
49+
raise DSSEError(f"SSL error: {p.out}")
50+
51+
def verify(self, certificate: str) -> bool:
52+
"""Preliminary check on the signature.
53+
54+
The current algorithm is to check that at least one signature correspond
55+
to the certificate given as parameter. This part should be improved
56+
57+
:param certifciate: path to the certificate containing the public key
58+
:return: True if one of the signature can be checked with the certificate
59+
"""
60+
# First get the public key
61+
p = Run(["openssl", "x509", "-pubkey", "-noout", "-in", certificate])
62+
if p.status != 0 or p.raw_out is None:
63+
raise DSSEError(f"Cannot fetch public key from {certificate}")
64+
public_key = p.raw_out
65+
66+
with tempfile.TemporaryDirectory() as temp_dir:
67+
with open(os.path.join(temp_dir, "pub.crt"), "wb") as fd:
68+
fd.write(public_key)
69+
70+
with open(os.path.join(temp_dir, "pae"), "wb") as fd:
71+
fd.write(self.pae)
72+
73+
for s in self.signatures:
74+
with open(os.path.join(temp_dir, "sig"), "wb") as fd:
75+
fd.write(base64.b64decode(s["sig"]))
76+
77+
p = Run(
78+
[
79+
"openssl",
80+
"dgst",
81+
"-verify",
82+
os.path.join(temp_dir, "pub.crt"),
83+
"-signature",
84+
os.path.join(temp_dir, "sig"),
85+
os.path.join(temp_dir, "pae"),
86+
],
87+
)
88+
if p.status == 0:
89+
return True
90+
return False
91+
92+
@property
93+
def payload(self) -> str:
94+
"""Return the content to sign as base64 string.
95+
96+
:return: a base64 string representing the content
97+
"""
98+
return base64.b64encode(self.body).decode("utf-8")
99+
100+
@property
101+
def pae(self) -> bytes:
102+
"""Return the Pre-Authentication Encoding.
103+
104+
This is the content that is really signed
105+
"""
106+
payload_type_bytes = self.payload_type.encode("utf-8")
107+
return b" ".join(
108+
(
109+
b"DSSEv1",
110+
str(len(payload_type_bytes)).encode("utf-8"),
111+
payload_type_bytes,
112+
str(len(self.body)).encode("utf-8"),
113+
self.body,
114+
)
115+
)
116+
117+
def as_dict(self) -> dict:
118+
"""Return the dict representing the DSSE envelope."""
119+
return {
120+
"payload": self.payload,
121+
"payloadType": self.payload_type,
122+
"signatures": self.signatures,
123+
}
124+
125+
def as_json(self) -> str:
126+
"""Return the DSSE envelope."""
127+
return json.dumps(self.as_dict())
128+
129+
@classmethod
130+
def load_json(cls, envelope: str) -> DSSE:
131+
"""Load a json DSSE string and return a Python DSSE object.
132+
133+
:param envelope: the json envelope
134+
"""
135+
return cls.load_dict(json.loads(envelope))
136+
137+
@classmethod
138+
def load_dict(cls, envelope: dict) -> DSSE:
139+
"""Load a dict and return a Python DSSE object.
140+
141+
:param envelope: the json envelope
142+
"""
143+
result = cls(
144+
body=base64.b64decode(envelope["payload"]),
145+
payload_type=envelope["payloadType"],
146+
)
147+
result.signatures = [
148+
{"keyid": sig["keyid"], "sig": sig["sig"]} for sig in envelope["signatures"]
149+
]
150+
return result

src/e3/os/process.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,12 @@ def __init__(
236236
cwd: str | None = None,
237237
output: STDOUT_VALUE | DEVNULL_VALUE | PIPE_VALUE | str | IO | None = PIPE,
238238
error: STDOUT_VALUE | DEVNULL_VALUE | PIPE_VALUE | str | IO | None = STDOUT,
239-
input: DEVNULL_VALUE | PIPE_VALUE | str | IO | None = None, # noqa: A002
239+
input: DEVNULL_VALUE # noqa: A002
240+
| PIPE_VALUE
241+
| str
242+
| bytes
243+
| IO
244+
| None = None, # noqa: A002
240245
bg: bool = False,
241246
timeout: int | None = None,
242247
env: dict | None = None,
@@ -630,8 +635,9 @@ class File:
630635
def __init__(self, name: Any, mode: str = "r"):
631636
"""Create a new File.
632637
633-
:param name: can be PIPE, STDOUT, a filename string, an opened fd, a
634-
python file object, or a command to pipe (if starts with ``|``)
638+
:param name: can be PIPE, STDOUT, a filename string, bytes, an opened
639+
fd, a python file object, or a command to pipe
640+
(if starts with ``|``)
635641
:param mode: can be 'r' or 'w' if name starts with + the mode will be
636642
a+
637643
"""
@@ -640,7 +646,9 @@ def __init__(self, name: Any, mode: str = "r"):
640646

641647
self.name = name
642648
self.to_close = False
643-
if isinstance(name, str):
649+
if isinstance(name, bytes) and mode == "r" and name.startswith(b"|"):
650+
self.fd = subprocess.PIPE
651+
elif isinstance(name, str):
644652
# can be a pipe or a filename
645653
if mode == "r" and name.startswith("|"):
646654
self.fd = subprocess.PIPE

tests/tests_e3/dsse_test.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
from e3.os.process import Run
2+
from e3.dsse import DSSE, DSSEError
3+
import pytest
4+
5+
6+
def test_dsse():
7+
# Generate a temporary x509 keypairs
8+
p = Run(
9+
[
10+
"openssl",
11+
"req",
12+
"-x509",
13+
"-newkey",
14+
"rsa:4096",
15+
"-keyout",
16+
"codesign1.key",
17+
"-out",
18+
"codesign1.crt",
19+
"-nodes",
20+
"-subj",
21+
"/CN=localhost",
22+
]
23+
)
24+
assert p.status == 0, f"openssl failed:\n{p.out}"
25+
26+
p = Run(
27+
[
28+
"openssl",
29+
"req",
30+
"-x509",
31+
"-newkey",
32+
"rsa:4096",
33+
"-keyout",
34+
"codesign2.key",
35+
"-out",
36+
"codesign2.crt",
37+
"-nodes",
38+
"-subj",
39+
"/CN=localhost",
40+
]
41+
)
42+
assert p.status == 0, f"openssl failed:\n{p.out}"
43+
44+
# Create an envelope for a string value
45+
d = DSSE(body='{"key": "value"}', payload_type="application/json")
46+
47+
# Check that we can sign and verify
48+
assert not d.verify("./codesign1.crt")
49+
d.sign("mykey", "./codesign1.key")
50+
assert d.verify("./codesign1.crt")
51+
assert not d.verify("./codesign2.crt")
52+
53+
# Create an envelope for a bytes value
54+
d = DSSE(body=b'{"key": "value"}', payload_type="application/json")
55+
56+
# Check that we can have several signatures
57+
assert not d.verify("./codesign1.crt")
58+
assert not d.verify("./codesign2.crt")
59+
d.sign("mykey", "./codesign1.key")
60+
d.sign("mykey", "./codesign2.key")
61+
assert d.verify("./codesign1.crt")
62+
assert d.verify("./codesign2.crt")
63+
64+
# Ensure that serializing and deserializing works
65+
dsse_envelope = d.as_json()
66+
d2 = DSSE.load_json(dsse_envelope)
67+
assert d2.verify("./codesign1.crt")
68+
assert d2.verify("./codesign2.crt")
69+
70+
with pytest.raises(DSSEError) as err:
71+
d2.verify("./unknown.crt")
72+
assert "Cannot fetch public key" in str(err)
73+
with pytest.raises(DSSEError) as err:
74+
d2.sign("unknown", "./unknown.key")
75+
assert "SSL error" in str(err)

0 commit comments

Comments
 (0)