|
| 1 | +#!/usr/bin/env python |
| 2 | + |
| 3 | +import argparse |
| 4 | +import logging |
| 5 | +import sys |
| 6 | +from binascii import hexlify |
| 7 | +from re import match |
| 8 | + |
| 9 | +from xrpl.clients import JsonRpcClient |
| 10 | +from xrpl.models.requests import LedgerEntry, Ledger |
| 11 | +from xrpl.utils import ripple_time_to_datetime |
| 12 | + |
| 13 | +# Set up logging -------------------------------------------------------------- |
| 14 | +# Use WARNING by default in case verify_credential is called from elsewhere. |
| 15 | +logger = logging.getLogger("verify_credential") |
| 16 | +logger.setLevel(logging.WARNING) |
| 17 | +logger.addHandler(logging.StreamHandler(sys.stderr)) |
| 18 | + |
| 19 | +# Define an error to throw when XRPL lookup fails unexpectedly ---------------- |
| 20 | +class XRPLLookupError(Exception): |
| 21 | + def __init__(self, xrpl_response): |
| 22 | + self.body = xrpl_response.result |
| 23 | + |
| 24 | +# Main function --------------------------------------------------------------- |
| 25 | +def verify_credential(client:JsonRpcClient, |
| 26 | + issuer:str, |
| 27 | + subject:str, |
| 28 | + credential_type:str="", |
| 29 | + credential_type_hex:str=""): |
| 30 | + """ |
| 31 | + Check whether an XRPL account holds a specified credential, |
| 32 | + as of the most recently validated ledger. |
| 33 | +
|
| 34 | + Paramters: |
| 35 | + client - JsonRpcClient for the XRPL network to use. |
| 36 | + issuer - Address of the credential issuer, in base58 |
| 37 | + subject - Address of the credential holder/subject, in base58 |
| 38 | + credential_type - Credential type to check for as a string, |
| 39 | + which will be encoded as UTF-8 (1-64 bytes long). |
| 40 | + credential_type_hex - Credential type (binary) as hexadecimal. |
| 41 | + verbose - If true, print details to stdout during lookup. |
| 42 | + You must provide either credential_type or credential_type_hex. |
| 43 | +
|
| 44 | + Returns True if the account holds the specified, valid credential. |
| 45 | + Returns False if the credential is missing, expired, or not accepted. |
| 46 | + """ |
| 47 | + # Handle function inputs -------------------------------------------------- |
| 48 | + if not (credential_type or credential_type_hex): |
| 49 | + raise ValueError("Provide a non-empty credential_type or " + |
| 50 | + "credential_type_hex") |
| 51 | + if credential_type and credential_type_hex: |
| 52 | + raise ValueError("Provide either credential_type or " + |
| 53 | + "credential_type_hex, but not both") |
| 54 | + |
| 55 | + # Encode credential_type as uppercase hex, if needed |
| 56 | + if credential_type: |
| 57 | + credential_type_hex = hexlify(credential_type.encode("utf-8") |
| 58 | + ).decode("ascii") |
| 59 | + logger.info("Encoded credential_type as hex: "+credential_type_hex.upper()) |
| 60 | + credential_type_hex = credential_type_hex.upper() |
| 61 | + |
| 62 | + if len(credential_type_hex) % 2 or \ |
| 63 | + not match(r"[0-9A-F]{2,128}", credential_type_hex): |
| 64 | + # Hexadecimal is always 2 chars per byte, so an odd length is invalid. |
| 65 | + raise ValueError("credential_type_hex must be 1-64 bytes as hexadecimal.") |
| 66 | + |
| 67 | + # Perform XRPL lookup of Credential ledger entry -------------------------- |
| 68 | + ledger_entry_request = LedgerEntry( |
| 69 | + credential={ |
| 70 | + "subject": subject, |
| 71 | + "issuer": issuer, |
| 72 | + "credential_type": credential_type_hex |
| 73 | + }, |
| 74 | + ledger_index="validated" |
| 75 | + ) |
| 76 | + logger.info("Looking up credential...") |
| 77 | + logger.info(ledger_entry_request.to_dict()) |
| 78 | + xrpl_response = client.request(ledger_entry_request) |
| 79 | + |
| 80 | + if xrpl_response.status != "success": |
| 81 | + if xrpl_response.result["error"] == "entryNotFound": |
| 82 | + logger.info("Credential was not found") |
| 83 | + return False |
| 84 | + # Other errors, for example invalidly-specified addresses. |
| 85 | + raise XRPLLookupError(xrpl_response) |
| 86 | + |
| 87 | + credential = xrpl_response.result["node"] |
| 88 | + logger.info("Found credential:") |
| 89 | + logger.info(credential) |
| 90 | + |
| 91 | + # Confirm that the credential has been accepted --------------------------- |
| 92 | + lsfAccepted = 0x00010000 |
| 93 | + if not credential["Flags"] & lsfAccepted: |
| 94 | + logger.info("Credential is not accepted.") |
| 95 | + return False |
| 96 | + |
| 97 | + # Confirm that the credential is not expired ------------------------------ |
| 98 | + if credential.get("Expiration"): |
| 99 | + expiration_time = ripple_time_to_datetime(credential["Expiration"]) |
| 100 | + logger.info("Credential has expiration: "+expiration_time.isoformat()) |
| 101 | + logger.info("Looking up validated ledger to check for expiration.") |
| 102 | + |
| 103 | + ledger_response = client.request(Ledger(ledger_index="validated")) |
| 104 | + if ledger_response.status != "success": |
| 105 | + raise XRPLLookupError(ledger_response) |
| 106 | + close_time = ripple_time_to_datetime( |
| 107 | + ledger_response.result["ledger"]["close_time"] |
| 108 | + ) |
| 109 | + logger.info("Most recent validated ledger is: "+close_time.isoformat()) |
| 110 | + |
| 111 | + if close_time > expiration_time: |
| 112 | + logger.info("Credential is expired.") |
| 113 | + return False |
| 114 | + |
| 115 | + # Credential has passed all checks. --------------------------------------- |
| 116 | + logger.info("Credential is valid.") |
| 117 | + return True |
| 118 | + |
| 119 | +# Commandline usage ----------------------------------------------------------- |
| 120 | +if __name__=="__main__": |
| 121 | + NETWORKS = { |
| 122 | + # JSON-RPC URLs of public servers |
| 123 | + "devnet": "https://s.devnet.rippletest.net:51234/", |
| 124 | + "testnet": "https://s.altnet.rippletest.net:51234/", |
| 125 | + "mainnet": "https://xrplcluster.com/" |
| 126 | + } |
| 127 | + |
| 128 | + # Parse arguments --------------------------------------------------------- |
| 129 | + parser = argparse.ArgumentParser(description="Verify an XRPL credential") |
| 130 | + parser.add_argument("issuer", type=str, nargs="?", |
| 131 | + help="Credential issuer address as base58.", |
| 132 | + default="rEzikzbnH6FQJ2cCr4Bqmf6c3jyWLzkonS") |
| 133 | + parser.add_argument("subject", type=str, nargs="?", |
| 134 | + help="Credential subject (holder) address as base58.", |
| 135 | + default="rsYhHbanGpnYe3M6bsaMeJT5jnLTfDEzoA") |
| 136 | + parser.add_argument("credential_type", type=str, nargs="?", |
| 137 | + help="Credential type as string", |
| 138 | + default="my_credential") |
| 139 | + parser.add_argument("-b", "--binary", action="store_true", |
| 140 | + help="Use binary (hexadecimal) for credential_type") |
| 141 | + parser.add_argument("-n", "--network", choices=NETWORKS.keys(), |
| 142 | + help="Use the specified network for lookup", |
| 143 | + default="devnet") |
| 144 | + parser.add_argument("-q", "--quiet", action="store_true", |
| 145 | + help="Don't print log messages.") |
| 146 | + args = parser.parse_args() |
| 147 | + |
| 148 | + # Call verify_credential with appropriate args ---------------------------- |
| 149 | + client = JsonRpcClient(NETWORKS[args.network]) |
| 150 | + if not args.quiet: |
| 151 | + # Use INFO level by default when called from the commandline. |
| 152 | + logger.setLevel(logging.INFO) |
| 153 | + |
| 154 | + if args.binary: |
| 155 | + result = verify_credential(client, |
| 156 | + issuer=args.issuer, |
| 157 | + subject=args.subject, |
| 158 | + credential_type_hex=args.credential_type) |
| 159 | + else: |
| 160 | + result = verify_credential(client, |
| 161 | + issuer=args.issuer, |
| 162 | + subject=args.subject, |
| 163 | + credential_type=args.credential_type) |
| 164 | + |
| 165 | + # Return a nonzero exit code if credential verification failed. ----------- |
| 166 | + if not result: |
| 167 | + exit(1) |
0 commit comments