|
| 1 | +# The MIT License (MIT) |
| 2 | + |
| 3 | +# Copyright (c) 2021-2024 Krux contributors |
| 4 | + |
| 5 | +# Permission is hereby granted, free of charge, to any person obtaining a copy |
| 6 | +# of this software and associated documentation files (the "Software"), to deal |
| 7 | +# in the Software without restriction, including without limitation the rights |
| 8 | +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| 9 | +# copies of the Software, and to permit persons to whom the Software is |
| 10 | +# furnished to do so, subject to the following conditions: |
| 11 | + |
| 12 | +# The above copyright notice and this permission notice shall be included in |
| 13 | +# all copies or substantial portions of the Software. |
| 14 | + |
| 15 | +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| 16 | +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| 17 | +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| 18 | +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| 19 | +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 20 | +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| 21 | +# THE SOFTWARE. |
| 22 | + |
| 23 | +# This code an adaptation of Coinkite's BBQr python implementation for Krux environment |
| 24 | +# https://github.com/coinkite/BBQr |
| 25 | + |
| 26 | +import gc |
| 27 | + |
| 28 | +# BBQR |
| 29 | +KNOWN_ENCODINGS = {"H", "2", "Z"} |
| 30 | + |
| 31 | +# File types |
| 32 | +# P='PSBT', T='Transaction', J='JSON', C='CBOR' |
| 33 | +# U='Unicode Text', X='Executable', B='Binary' |
| 34 | +KNOWN_FILETYPES = {"P", "T", "J", "U"} |
| 35 | + |
| 36 | +BBQR_ALWAYS_COMPRESS_THRESHOLD = 5000 # bytes |
| 37 | + |
| 38 | +B32CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567" |
| 39 | +assert len(B32CHARS) == 32 |
| 40 | + |
| 41 | +BBQR_PREFIX_LENGTH = 8 |
| 42 | + |
| 43 | +QR_CAPACITY_ALPHANUMERIC = [ |
| 44 | + 25, |
| 45 | + 47, |
| 46 | + 77, |
| 47 | + 114, |
| 48 | + 154, |
| 49 | + 195, |
| 50 | + 224, |
| 51 | + 279, |
| 52 | + 335, |
| 53 | + 395, |
| 54 | + 468, |
| 55 | + 535, |
| 56 | + 619, |
| 57 | + 667, |
| 58 | + 758, |
| 59 | + 854, |
| 60 | + 938, |
| 61 | + 1046, |
| 62 | + 1153, |
| 63 | + 1249, |
| 64 | +] |
| 65 | + |
| 66 | +class BBQrCode: |
| 67 | + """A BBQr code, containing the data, encoding, and file type""" |
| 68 | + |
| 69 | + |
| 70 | + def __init__(self, payload, encoding=None, file_type=None): |
| 71 | + """Initializes the BBQr code with the given data, encoding, and file type""" |
| 72 | + |
| 73 | + if encoding not in KNOWN_ENCODINGS: |
| 74 | + raise ValueError("Invalid BBQr encoding") |
| 75 | + if file_type not in KNOWN_FILETYPES: |
| 76 | + raise ValueError("Invalid BBQr file type") |
| 77 | + self.payload = payload |
| 78 | + self.encoding = encoding |
| 79 | + self.file_type = file_type |
| 80 | + |
| 81 | + def find_min_num_parts(self, max_width): |
| 82 | + qr_capacity = self.max_qr_bytes(max_width) |
| 83 | + data_length = len(self.payload) |
| 84 | + max_part_size = qr_capacity - BBQR_PREFIX_LENGTH |
| 85 | + if data_length < max_part_size: |
| 86 | + return 1, data_length |
| 87 | + # Round max_part_size to the nearest lower multiple of 8 |
| 88 | + max_part_size = (max_part_size // 8) * 8 |
| 89 | + # Calculate the number of parts required (rounded up) |
| 90 | + num_parts = (data_length + max_part_size - 1) // max_part_size |
| 91 | + # Calculate the optimal part size |
| 92 | + part_size = data_length // num_parts |
| 93 | + # Round to the nearest higher multiple of 8 |
| 94 | + part_size = ((part_size + 7) // 8) * 8 |
| 95 | + # Check if the part size is within the limits |
| 96 | + if part_size > max_part_size: |
| 97 | + num_parts += 1 |
| 98 | + part_size = data_length // num_parts |
| 99 | + # Round to the nearest higher multiple of 8 again |
| 100 | + part_size = ((part_size + 7) // 8) * 8 |
| 101 | + return num_parts, part_size |
| 102 | + |
| 103 | + def max_qr_bytes(self, max_width): |
| 104 | + """Calculates the maximum length, in bytes, a QR code of a given size can store""" |
| 105 | + # Given qr_size = 17 + 4 * version + 2 * frame_size |
| 106 | + max_width -= 2 # Subtract frame width |
| 107 | + qr_version = (max_width - 17) // 4 |
| 108 | + capacity_list = QR_CAPACITY_ALPHANUMERIC |
| 109 | + |
| 110 | + try: |
| 111 | + return capacity_list[qr_version - 1] |
| 112 | + except: |
| 113 | + # Limited to version 20 |
| 114 | + return capacity_list[-1] |
| 115 | + |
| 116 | + def to_qr_code(self, max_width): |
| 117 | + num_parts, part_size = self.find_min_num_parts(max_width) |
| 118 | + part_index = 0 |
| 119 | + while True: |
| 120 | + header = "B$%s%s%s%s" % ( |
| 121 | + self.encoding, |
| 122 | + self.file_type, |
| 123 | + int2base36(num_parts), |
| 124 | + int2base36(part_index), |
| 125 | + ) |
| 126 | + part = None |
| 127 | + if part_index == num_parts - 1: |
| 128 | + part = header + self.payload[part_index * part_size :] |
| 129 | + part_index = 0 |
| 130 | + else: |
| 131 | + part = ( |
| 132 | + header |
| 133 | + + self.payload[ |
| 134 | + part_index * part_size : (part_index + 1) * part_size |
| 135 | + ] |
| 136 | + ) |
| 137 | + part_index += 1 |
| 138 | + yield (part, num_parts) |
| 139 | + |
| 140 | + def parse(self, data): |
| 141 | + part, index, total = parse_bbqr(data) |
| 142 | + self.parts[index] = part |
| 143 | + self.total = total |
| 144 | + return index |
| 145 | + |
| 146 | + |
| 147 | + |
| 148 | +def parse_bbqr(data): |
| 149 | + """ |
| 150 | + Parses the QR as a BBQR part, extracting the part's content, |
| 151 | + encoding, file format, index, and total |
| 152 | + """ |
| 153 | + if len(data) < 8: |
| 154 | + raise ValueError("Invalid BBQR format") |
| 155 | + |
| 156 | + encoding = data[2] |
| 157 | + if encoding not in KNOWN_ENCODINGS: |
| 158 | + raise ValueError("Invalid encoding") |
| 159 | + |
| 160 | + file_type = data[3] |
| 161 | + if file_type not in KNOWN_FILETYPES: |
| 162 | + raise ValueError("Invalid file type") |
| 163 | + |
| 164 | + try: |
| 165 | + part_total = int(data[4:6], 36) |
| 166 | + part_index = int(data[6:8], 36) |
| 167 | + except ValueError: |
| 168 | + raise ValueError("Invalid BBQR format") |
| 169 | + |
| 170 | + if part_index >= part_total: |
| 171 | + raise ValueError("Invalid part index") |
| 172 | + |
| 173 | + return data[8:], part_index, part_total |
| 174 | + |
| 175 | + |
| 176 | +def deflate_compress(data): |
| 177 | + """Compresses the given data using deflate module""" |
| 178 | + try: |
| 179 | + import deflate |
| 180 | + from io import BytesIO |
| 181 | + |
| 182 | + stream = BytesIO() |
| 183 | + with deflate.DeflateIO(stream) as d: |
| 184 | + d.write(data) |
| 185 | + return stream.getvalue() |
| 186 | + except Exception as e: |
| 187 | + print(e) |
| 188 | + raise ValueError("Error compressing BBQR") |
| 189 | + |
| 190 | + |
| 191 | +def deflate_decompress(data): |
| 192 | + """Decompresses the given data using deflate module""" |
| 193 | + try: |
| 194 | + import deflate |
| 195 | + from io import BytesIO |
| 196 | + |
| 197 | + with deflate.DeflateIO(BytesIO(data)) as d: |
| 198 | + return d.read() |
| 199 | + except: |
| 200 | + raise ValueError("Error decompressing BBQR") |
| 201 | + |
| 202 | + |
| 203 | +def decode_bbqr(parts, encoding, file_type): |
| 204 | + """Decodes the given data as BBQR, returning the decoded data""" |
| 205 | + |
| 206 | + if encoding == "H": |
| 207 | + from binascii import unhexlify |
| 208 | + |
| 209 | + data_bytes = bytearray() |
| 210 | + for _, part in sorted(parts.items()): |
| 211 | + data_bytes.extend(unhexlify(part)) |
| 212 | + return bytes(data_bytes) |
| 213 | + |
| 214 | + binary_data = b"" |
| 215 | + for _, part in sorted(parts.items()): |
| 216 | + padding = (8 - (len(part) % 8)) % 8 |
| 217 | + padded_part = part + (padding * "=") |
| 218 | + binary_data += base32_decode_stream(padded_part) |
| 219 | + |
| 220 | + if encoding == "Z": |
| 221 | + if file_type in "JU": |
| 222 | + return deflate_decompress(binary_data).decode("utf-8") |
| 223 | + return deflate_decompress(binary_data) |
| 224 | + if file_type in "JU": |
| 225 | + return binary_data.decode("utf-8") |
| 226 | + return binary_data |
| 227 | + |
| 228 | + |
| 229 | +def encode_bbqr(data, encoding="Z", file_type="P"): |
| 230 | + """Encodes the given data as BBQR, returning the encoded data and format""" |
| 231 | + |
| 232 | + if encoding == "H": |
| 233 | + from binascii import hexlify |
| 234 | + |
| 235 | + data = hexlify(data).decode() |
| 236 | + return BBQrCode(data.upper(), encoding, file_type) |
| 237 | + |
| 238 | + if encoding == "Z": |
| 239 | + if len(data) > BBQR_ALWAYS_COMPRESS_THRESHOLD: |
| 240 | + # RAM won't be enough to have both compressed and not compressed data |
| 241 | + # It will always be beneficial to compress large data |
| 242 | + data = deflate_compress(data) |
| 243 | + else: |
| 244 | + # Check if compression is beneficial |
| 245 | + cmp = deflate_compress(data) |
| 246 | + if len(cmp) >= len(data): |
| 247 | + encoding = "2" |
| 248 | + else: |
| 249 | + encoding = "Z" |
| 250 | + data = cmp |
| 251 | + |
| 252 | + data = data.encode("utf-8") if isinstance(data, str) else data |
| 253 | + gc.collect() |
| 254 | + return BBQrCode("".join(base32_encode_stream(data)), encoding, file_type) |
| 255 | + |
| 256 | + |
| 257 | +# Base 32 encoding/decoding, used in BBQR only |
| 258 | + |
| 259 | + |
| 260 | +def base32_decode_stream(encoded_str): |
| 261 | + """Decodes a Base32 string""" |
| 262 | + base32_index = {ch: index for index, ch in enumerate(B32CHARS)} |
| 263 | + |
| 264 | + # Strip padding |
| 265 | + encoded_str = encoded_str.rstrip("=") |
| 266 | + |
| 267 | + buffer = 0 |
| 268 | + bits_left = 0 |
| 269 | + decoded_bytes = bytearray() |
| 270 | + |
| 271 | + for char in encoded_str: |
| 272 | + if char not in base32_index: |
| 273 | + raise ValueError("Invalid Base32 character: %s" % char) |
| 274 | + index = base32_index[char] |
| 275 | + buffer = (buffer << 5) | index |
| 276 | + bits_left += 5 |
| 277 | + |
| 278 | + while bits_left >= 8: |
| 279 | + bits_left -= 8 |
| 280 | + decoded_bytes.append((buffer >> bits_left) & 0xFF) |
| 281 | + buffer &= (1 << bits_left) - 1 # Keep only the remaining bits |
| 282 | + |
| 283 | + return bytes(decoded_bytes) |
| 284 | + |
| 285 | + |
| 286 | +def base32_encode_stream(data, add_padding=False): |
| 287 | + """A streaming base32 encoder""" |
| 288 | + buffer = 0 |
| 289 | + bits_left = 0 |
| 290 | + |
| 291 | + for byte in data: |
| 292 | + buffer = (buffer << 8) | byte |
| 293 | + bits_left += 8 |
| 294 | + |
| 295 | + while bits_left >= 5: |
| 296 | + bits_left -= 5 |
| 297 | + yield B32CHARS[(buffer >> bits_left) & 0x1F] |
| 298 | + buffer &= (1 << bits_left) - 1 # Keep only the remaining bits |
| 299 | + |
| 300 | + if bits_left > 0: |
| 301 | + buffer <<= 5 - bits_left |
| 302 | + yield B32CHARS[buffer & 0x1F] |
| 303 | + |
| 304 | + # Padding |
| 305 | + if add_padding: |
| 306 | + encoded_length = (len(data) * 8 + 4) // 5 |
| 307 | + padding_length = (8 - (encoded_length % 8)) % 8 |
| 308 | + for _ in range(padding_length): |
| 309 | + yield "=" |
| 310 | + |
| 311 | + |
| 312 | +def int2base36(n): |
| 313 | + """Convert integer n to a base36 string.""" |
| 314 | + if not 0 <= n <= 1295: # ensure the number is within the valid range |
| 315 | + raise ValueError("Number out of range") |
| 316 | + |
| 317 | + def tostr(x): |
| 318 | + """Convert integer x to a base36 character.""" |
| 319 | + return chr(48 + x) if x < 10 else chr(65 + x - 10) |
| 320 | + |
| 321 | + quotient, remainder = divmod(n, 36) |
| 322 | + return tostr(quotient) + tostr(remainder) |
| 323 | + |
0 commit comments