|
| 1 | +import logging |
| 2 | +from io import BytesIO |
| 3 | +from threading import RLock |
| 4 | +from typing import List, Optional |
| 5 | + |
| 6 | +import pypdfium2 as pdfium |
| 7 | +import pypdfium2.raw as pdfium_c |
| 8 | + |
| 9 | +from mindee.extraction import attach_images_as_new_file |
| 10 | +from mindee.image_operations.image_compressor import compress_image |
| 11 | +from mindee.pdf.pdf_char_data import PDFCharData |
| 12 | +from mindee.pdf.pdf_utils import extract_text_from_pdf, has_source_text |
| 13 | + |
| 14 | +logger = logging.getLogger(__name__) |
| 15 | +MIN_QUALITY = 1 |
| 16 | + |
| 17 | + |
| 18 | +def compress_pdf( |
| 19 | + pdf_data: bytes, |
| 20 | + image_quality: int = 85, |
| 21 | + force_source_text_compression: bool = False, |
| 22 | + disable_source_text: bool = True, |
| 23 | +) -> bytes: |
| 24 | + """ |
| 25 | + Compresses each page of a provided PDF buffer. |
| 26 | +
|
| 27 | + :param pdf_data: The input PDF as bytes. |
| 28 | + :param image_quality: Compression quality (70-100 for most JPG images). |
| 29 | + :param force_source_text_compression: If true, attempts to re-write detected text. |
| 30 | + :param disable_source_text: If true, doesn't re-apply source text to the output PDF. |
| 31 | + :return: Compressed PDF as bytes. |
| 32 | + """ |
| 33 | + if has_source_text(pdf_data): |
| 34 | + if force_source_text_compression: |
| 35 | + if not disable_source_text: |
| 36 | + logger.warning("Re-writing PDF source-text is an EXPERIMENTAL feature.") |
| 37 | + else: |
| 38 | + logger.warning( |
| 39 | + "Source file contains text, but disable_source_text flag " |
| 40 | + "is set to false. Resulting file will not contain any embedded text." |
| 41 | + ) |
| 42 | + else: |
| 43 | + logger.warning( |
| 44 | + "Found text inside of the provided PDF file. Compression operation aborted since disableSourceText " |
| 45 | + "is set to 'true'." |
| 46 | + ) |
| 47 | + return pdf_data |
| 48 | + |
| 49 | + extracted_text = ( |
| 50 | + extract_text_from_pdf(pdf_data) if not disable_source_text else None |
| 51 | + ) |
| 52 | + |
| 53 | + compressed_pages = compress_pdf_pages( |
| 54 | + pdf_data, extracted_text, image_quality, disable_source_text |
| 55 | + ) |
| 56 | + |
| 57 | + if not compressed_pages: |
| 58 | + logger.warning( |
| 59 | + "Could not compress PDF to a smaller size. Returning original PDF." |
| 60 | + ) |
| 61 | + return pdf_data |
| 62 | + |
| 63 | + out_pdf = attach_images_as_new_file( |
| 64 | + [BytesIO(compressed_page) for compressed_page in compressed_pages] |
| 65 | + ) |
| 66 | + out_bytes = BytesIO() |
| 67 | + out_pdf.save(out_bytes) |
| 68 | + |
| 69 | + return out_bytes.read() |
| 70 | + |
| 71 | + |
| 72 | +def compress_pdf_pages( |
| 73 | + pdf_data: bytes, |
| 74 | + extracted_text: Optional[List[PDFCharData]], |
| 75 | + image_quality: int, |
| 76 | + disable_source_text: bool, |
| 77 | +) -> Optional[List[bytes]]: |
| 78 | + """ |
| 79 | + Compresses PDF pages and returns an array of compressed page buffers. |
| 80 | +
|
| 81 | + :param pdf_data: The input PDF as bytes. |
| 82 | + :param extracted_text: Extracted text from the PDF. |
| 83 | + :param image_quality: Initial compression quality. |
| 84 | + :param disable_source_text: If true, doesn't re-apply source text to the output PDF. |
| 85 | + :return: List of compressed page buffers, or None if compression fails. |
| 86 | + """ |
| 87 | + original_size = len(pdf_data) |
| 88 | + image_quality_loop = image_quality |
| 89 | + |
| 90 | + while image_quality_loop >= MIN_QUALITY: |
| 91 | + compressed_pages = compress_pages_with_quality( |
| 92 | + pdf_data, extracted_text, image_quality_loop, disable_source_text |
| 93 | + ) |
| 94 | + total_compressed_size = sum(len(page) for page in compressed_pages) |
| 95 | + |
| 96 | + if is_compression_successful( |
| 97 | + total_compressed_size, original_size, image_quality |
| 98 | + ): |
| 99 | + return compressed_pages |
| 100 | + |
| 101 | + image_quality_loop -= round(lerp(1, 10, image_quality_loop / 100)) |
| 102 | + |
| 103 | + return None |
| 104 | + |
| 105 | + |
| 106 | +def add_text_to_pdf_page( # type: ignore |
| 107 | + page: pdfium.PdfPage, |
| 108 | + extracted_text: Optional[List[PDFCharData]], |
| 109 | +) -> None: |
| 110 | + """ |
| 111 | + Adds text to a PDF page based on the extracted text data. |
| 112 | +
|
| 113 | + :param page: The PdfPage object to add text to. |
| 114 | + :param extracted_text: List of PDFCharData objects containing text and positioning information. |
| 115 | + """ |
| 116 | + if not extracted_text: |
| 117 | + return |
| 118 | + |
| 119 | + height = page.get_height() |
| 120 | + document = page.pdf |
| 121 | + pdfium_lock = RLock() |
| 122 | + |
| 123 | + with pdfium_lock: |
| 124 | + text_handler = pdfium_c.FPDFText_LoadPage(page.raw) |
| 125 | + for char_data in extracted_text: |
| 126 | + font = document.load_font( |
| 127 | + char_data.font_name, pdfium_c.FPDF_FONT_TRUETYPE, True |
| 128 | + ) |
| 129 | + text_object = document.create_text_object(font, char_data.font_size) |
| 130 | + text_object.set_text(char_data.char) |
| 131 | + x = char_data.left |
| 132 | + y = height - char_data.bottom |
| 133 | + text_object.set_position(x, y) |
| 134 | + r, g, b, a = char_data.font_fill_color |
| 135 | + text_object.set_fill_color(r, g, b, a) |
| 136 | + pdfium_c.FPDFPage_InsertObject(text_handler, text_object) |
| 137 | + pdfium_c.FPDFPage_GenerateContent(text_handler) |
| 138 | + |
| 139 | + with pdfium_lock: |
| 140 | + pdfium_c.FPDFText_ClosePage(text_handler) |
| 141 | + |
| 142 | + |
| 143 | +def compress_pages_with_quality( |
| 144 | + pdf_data: bytes, |
| 145 | + extracted_text: Optional[list[PDFCharData]], |
| 146 | + image_quality: int, |
| 147 | + disable_source_text: bool, |
| 148 | +) -> List[bytes]: |
| 149 | + """ |
| 150 | + Compresses pages with a specific quality. |
| 151 | +
|
| 152 | + :param pdf_data: The input PDF as bytes. |
| 153 | + :param extracted_text: Extracted text from the PDF. |
| 154 | + :param image_quality: Compression quality. |
| 155 | + :param disable_source_text: If true, doesn't re-apply source text to the output PDF. |
| 156 | + :return: List of compressed page buffers. |
| 157 | + """ |
| 158 | + pdf_document = pdfium.PdfDocument(pdf_data) |
| 159 | + compressed_pages = [] |
| 160 | + |
| 161 | + for i in enumerate(pdf_document): |
| 162 | + page = pdf_document[i] |
| 163 | + rasterized_page = rasterize_page(page, image_quality) |
| 164 | + compressed_image = compress_image(rasterized_page, image_quality) |
| 165 | + |
| 166 | + if not disable_source_text: |
| 167 | + add_text_to_pdf_page(page, extracted_text) |
| 168 | + |
| 169 | + compressed_pages.append(compressed_image) |
| 170 | + |
| 171 | + return compressed_pages |
| 172 | + |
| 173 | + |
| 174 | +def is_compression_successful( |
| 175 | + total_compressed_size: int, original_size: int, image_quality: int |
| 176 | +) -> bool: |
| 177 | + """ |
| 178 | + Checks if the compression was successful based on the compressed size and original size. |
| 179 | +
|
| 180 | + :param total_compressed_size: Total size of compressed pages. |
| 181 | + :param original_size: Original PDF size. |
| 182 | + :param image_quality: Compression quality. |
| 183 | + :return: True if compression was successful, false otherwise. |
| 184 | + """ |
| 185 | + overhead = lerp(0.54, 0.18, image_quality / 100) |
| 186 | + return total_compressed_size + total_compressed_size * overhead < original_size |
| 187 | + |
| 188 | + |
| 189 | +def rasterize_page( # type: ignore |
| 190 | + page: pdfium.PdfPage, |
| 191 | + quality: int = 85, |
| 192 | +) -> bytes: |
| 193 | + """ |
| 194 | + Rasterizes a PDF page. |
| 195 | +
|
| 196 | + :param page: PdfPage object to rasterize. |
| 197 | + :param quality: Quality to apply during rasterization. |
| 198 | + :return: Rasterized page as bytes. |
| 199 | + """ |
| 200 | + image = page.render().to_pil() |
| 201 | + buffer = BytesIO() |
| 202 | + image.save(buffer, format="JPEG", quality=quality) |
| 203 | + return buffer.getvalue() |
| 204 | + |
| 205 | + |
| 206 | +def lerp(start: float, end: float, t: float) -> float: |
| 207 | + """ |
| 208 | + Performs linear interpolation between two numbers. |
| 209 | +
|
| 210 | + :param start: The starting value. |
| 211 | + :param end: The ending value. |
| 212 | + :param t: The interpolation factor (0 to 1). |
| 213 | + :return: The interpolated value. |
| 214 | + """ |
| 215 | + return start * (1 - t) + end * t |
0 commit comments