Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion keylime/registrar_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from keylime import api_version as keylime_api_version
from keylime import crypto, json, keylime_logging
from keylime.requests_client import RequestsClient
from keylime.tpm import amd_vtpm

logger = keylime_logging.init_logging("registrar_client")
api_version = keylime_api_version.current_version()
Expand Down Expand Up @@ -93,7 +94,7 @@ def doRegisterAgent(
"ekcert": ekcert,
"aik_tpm": aik_tpm,
}
if ekcert is None or ekcert == "emulator":
if ekcert is None or ekcert == "emulator" or amd_vtpm.is_amd_vtpm(base64.b64decode(ekcert)):
data["ek_tpm"] = ek_tpm

if mtls_cert is not None:
Expand Down
14 changes: 11 additions & 3 deletions keylime/registrar_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from keylime.db.keylime_db import DBEngineManager, SessionManager
from keylime.db.registrar_db import RegistrarMain
from keylime.tpm import tpm2_objects
from keylime.tpm import amd_vtpm
from keylime.tpm.tpm_main import tpm

logger = keylime_logging.init_logging("registrar")
Expand Down Expand Up @@ -259,9 +260,16 @@ def do_POST(self):

initialize_tpm = tpm()

if ekcert is None or ekcert == "emulator":
logger.warning("Agent %s did not submit an ekcert", agent_id)
ek_tpm = json_body["ek_tpm"]
if ekcert is None or ekcert == 'emulator':
logger.warning('Agent %s did not submit an ekcert' % agent_id)
ek_tpm = json_body['ek_tpm']
elif amd_vtpm.is_amd_vtpm(base64.b64decode(ekcert)):
logger.info('Agent %s is using an amd vTPM' % agent_id)
if amd_vtpm.is_amd_vtpm_ek_valid(base64.b64decode(json_body['ek_tpm']), base64.b64decode(ekcert)):
ek_tpm = json_body['ek_tpm']
else:
logger.error('Agent does not have a valid EKpub digest')
return
else:
if "ek_tpm" in json_body:
# This would mean the agent submitted both a non-None ekcert, *and*
Expand Down
8 changes: 7 additions & 1 deletion keylime/tenant.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from keylime.common import algorithms, retry, states, validators
from keylime.requests_client import RequestsClient
from keylime.tpm import tpm2_objects
from keylime.tpm import amd_vtpm
from keylime.tpm.tpm_abstract import TPM_Utilities
from keylime.tpm.tpm_main import tpm

Expand Down Expand Up @@ -430,6 +431,11 @@ def check_ek(self, ekcert):
elif ekcert is None:
logger.warning("No EK cert provided, require_ek_cert option in config set to True")
return False
elif amd_vtpm.is_amd_vtpm(base64.b64decode(ekcert)):
if not amd_vtpm.verify_ekcert_surrogate(base64.b64decode(ekcert), self.mb_refstate):
logger.warning("Invalid attestation report in amd vTPM")
return False
return True
elif not self.tpm_instance.verify_ek(base64.b64decode(ekcert), config.get("tenant", "tpm_cert_store")):
logger.warning("Invalid EK certificate")
return False
Expand Down Expand Up @@ -492,7 +498,7 @@ def validate_tpm_quote(self, public_key, quote, hash_alg):

# check all EKs with optional script:
script = config.get("tenant", "ek_check_script")
if not script:
if not script or amd_vtpm.is_amd_vtpm(base64.b64decode(self.registrar_data["ekcert"])):
return True

if script[0] != "/":
Expand Down
83 changes: 83 additions & 0 deletions keylime/tpm/amd_vtpm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from keylime import keylime_logging
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import utils, ec
import requests
import struct
import json

logger = keylime_logging.init_logging('vtpm')

def measurement_allowed(measurement, allowed_list_json):
for i in allowed_list_json:
if i['measure'] == measurement:
return True

return False

def get_vcek(EKcert_surrogate):
tcb=struct.unpack('<BB4xBB', EKcert_surrogate[0x180:0x188])
chipID=struct.unpack('<64s', EKcert_surrogate[0x1a0:0x1E0])[0].hex()
url = ("https://kdsintf.amd.com/vcek/v1/Milan/" + chipID +
"?blSPL=%02d"%tcb[0] + "&teeSPL=%02d"%tcb[1] +
"&snpSPL=%02d"%tcb[2] + "&ucodeSPL=%02d"%tcb[3])

response = requests.get(url)

if (response.status_code < 200 or response.status_code > 299):
logger.error("Could not download vcek")
return None

return response.content

def is_amd_vtpm_ek_valid(EKpub, EKcert_surrogate):
ekpub_sha512=struct.unpack('<64s', EKcert_surrogate[0x50:0x90])[0]
hasher = hashes.Hash(hashes.SHA512())
hasher.update(EKpub)
digest = hasher.finalize()
return (digest == ekpub_sha512)

# TODO: is this good enough?
def is_amd_vtpm(EKcert_surrogate):
version = struct.unpack('<I', EKcert_surrogate[0x0:0x4])[0]
vmpl=struct.unpack('<I', EKcert_surrogate[0x30:0x34])[0]
return (vmpl == 0) and (version == 2) and (len(EKcert_surrogate) == 1184)

def verify_ekcert_surrogate(EKcert_surrogate, mb_refstate: dict):
vmpl=struct.unpack('<I', EKcert_surrogate[0x30:0x34])[0]
if (vmpl > 0):
logger.error("Using an attestation report from VMPL %d"%vmpl)
return False

# load SEV launch measurement, compare with measured boot refstate (if there is one)
alm="0x" + struct.unpack('<48s', EKcert_surrogate[0x90:0xC0])[0].hex()
logger.info("actual launch measurement=%s"%(alm))
if not (mb_refstate and 'launch_measurements' in mb_refstate):
logger.info("SEV launch measurement ignored because mb_refstate does not have launch measurements")
elif alm in mb_refstate['launch_measurements']:
logger.info("SEV launch measurement found in measured boot refstate")
else:
logger.error("SEV launch measurement does not match any provided in the measured boot refstate")
return False

sigRS=struct.unpack('<72s72s368x', EKcert_surrogate[0x2A0:0x4A0])
R=int.from_bytes(sigRS[0], 'little')
S=int.from_bytes(sigRS[1], 'little')
signature = utils.encode_dss_signature(R,S)

hasher = hashes.Hash(hashes.SHA384())
hasher.update(EKcert_surrogate[0:0x2A0])
digest = hasher.finalize()

vcek = get_vcek(EKcert_surrogate)
cert = x509.load_der_x509_certificate(vcek)
public_key = cert.public_key()

try:
public_key.verify(signature, digest, ec.ECDSA(utils.Prehashed(hashes.SHA384())))
except Exception as e:
logger.error("Failed to verify signature, %s"%(str(e)))
return False

logger.info("Successfully verified amd vTPM attestation report")
return True